diff --git a/apps/workspace-engine/main.go b/apps/workspace-engine/main.go index b0b32bffb..eb55a43ae 100644 --- a/apps/workspace-engine/main.go +++ b/apps/workspace-engine/main.go @@ -16,6 +16,7 @@ import ( "workspace-engine/svc/controllers/deploymentresourceselectoreval" "workspace-engine/svc/controllers/desiredrelease" "workspace-engine/svc/controllers/environmentresourceselectoreval" + "workspace-engine/svc/controllers/forcedeploy" "workspace-engine/svc/controllers/jobdispatch" "workspace-engine/svc/controllers/jobeligibility" "workspace-engine/svc/controllers/jobverificationmetric" @@ -48,6 +49,7 @@ func main() { deploymentplanresult.New(WorkerID, db.GetPool(ctx)), deploymentresourceselectoreval.New(WorkerID, db.GetPool(ctx)), environmentresourceselectoreval.New(WorkerID, db.GetPool(ctx)), + forcedeploy.New(WorkerID, db.GetPool(ctx)), jobdispatch.New(WorkerID, db.GetPool(ctx)), jobeligibility.New(WorkerID, db.GetPool(ctx)), jobverificationmetric.New(WorkerID, db.GetPool(ctx)), diff --git a/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go b/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go new file mode 100644 index 000000000..a9f6e20a9 --- /dev/null +++ b/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go @@ -0,0 +1,3 @@ +package events + +const ForceDeployKind = "force-deploy" diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/controller.go b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go new file mode 100644 index 000000000..db0db6f80 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go @@ -0,0 +1,103 @@ +package forcedeploy + +import ( + "context" + "fmt" + "time" + + "github.com/charmbracelet/log" + "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "workspace-engine/pkg/config" + "workspace-engine/pkg/reconcile" + "workspace-engine/pkg/reconcile/events" + "workspace-engine/pkg/reconcile/postgres" + "workspace-engine/svc" +) + +var tracer = otel.Tracer("workspace-engine/svc/controllers/forcedeploy") +var _ reconcile.Processor = (*Controller)(nil) + +type Controller struct { + getter Getter + setter Setter +} + +func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcile.Result, error) { + ctx, span := tracer.Start(ctx, "forcedeploy.Controller.Process") + defer span.End() + + span.SetAttributes( + attribute.Int64("item.id", item.ID), + attribute.String("item.scope_id", item.ScopeID), + ) + + rt, err := NewReleaseTarget(item.ScopeID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return reconcile.Result{}, fmt.Errorf("parse release target: %w", err) + } + + exists, err := c.getter.ReleaseTargetExists(ctx, rt) + if err != nil { + return reconcile.Result{}, fmt.Errorf("check release target exists: %w", err) + } + if !exists { + return reconcile.Result{}, nil + } + + result, err := Reconcile(ctx, item.WorkspaceID, c.getter, c.setter, rt) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return reconcile.Result{}, fmt.Errorf("reconcile force deploy: %w", err) + } + + if result.RequeueAfter > 0 { + return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil + } + + return reconcile.Result{}, nil +} + +// NewController creates a Controller with the given dependencies. +// Use this constructor in tests to inject mock implementations. +func NewController(getter Getter, setter Setter) *Controller { + return &Controller{getter: getter, setter: setter} +} + +func New(workerID string, pgxPool *pgxpool.Pool) svc.Service { + if pgxPool == nil { + log.Fatal("Failed to get pgx pool") + panic("failed to get pgx pool") + } + + kind := events.ForceDeployKind + maxConcurrency := config.GetMaxConcurrency(kind) + + nodeConfig := reconcile.NodeConfig{ + WorkerID: workerID, + BatchSize: 10, + PollInterval: 1 * time.Second, + LeaseDuration: 10 * time.Second, + LeaseHeartbeat: 5 * time.Second, + MaxConcurrency: maxConcurrency, + MaxRetryBackoff: 10 * time.Second, + } + + queue := postgres.NewForKinds(pgxPool, kind) + controller := &Controller{ + getter: &PostgresGetter{}, + setter: &PostgresSetter{}, + } + + worker, err := reconcile.NewWorker(kind, queue, controller, nodeConfig) + if err != nil { + log.Fatal("Failed to create force deploy reconcile worker", "error", err) + } + + return worker +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/getters.go b/apps/workspace-engine/svc/controllers/forcedeploy/getters.go new file mode 100644 index 000000000..666b72771 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/getters.go @@ -0,0 +1,18 @@ +package forcedeploy + +import ( + "context" + + "github.com/google/uuid" + "workspace-engine/pkg/oapi" +) + +type Getter interface { + ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error) + GetDesiredRelease(ctx context.Context, rt *ReleaseTarget) (*oapi.Release, error) + GetActiveJobsForReleaseTarget(ctx context.Context, rt *oapi.ReleaseTarget) ([]*oapi.Job, error) + GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error) + GetEnvironment(ctx context.Context, environmentID uuid.UUID) (*oapi.Environment, error) + GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error) + ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go b/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go new file mode 100644 index 000000000..8ac9d1a76 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go @@ -0,0 +1,125 @@ +package forcedeploy + +import ( + "context" + "errors" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "workspace-engine/pkg/db" + "workspace-engine/pkg/oapi" +) + +var _ Getter = (*PostgresGetter)(nil) + +type PostgresGetter struct{} + +func (g *PostgresGetter) ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error) { + return db.GetQueries(ctx).ReleaseTargetExists(ctx, db.ReleaseTargetExistsParams{ + DeploymentID: rt.DeploymentID, + EnvironmentID: rt.EnvironmentID, + ResourceID: rt.ResourceID, + }) +} + +func (g *PostgresGetter) GetDesiredRelease( + ctx context.Context, + rt *ReleaseTarget, +) (*oapi.Release, error) { + row, err := db.GetQueries(ctx). + GetDesiredReleaseByReleaseTarget(ctx, db.GetDesiredReleaseByReleaseTargetParams{ + ResourceID: rt.ResourceID, + EnvironmentID: rt.EnvironmentID, + DeploymentID: rt.DeploymentID, + }) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return db.ToOapiFullRelease(row), nil +} + +func (g *PostgresGetter) GetActiveJobsForReleaseTarget( + ctx context.Context, + rt *oapi.ReleaseTarget, +) ([]*oapi.Job, error) { + deploymentID, err := uuid.Parse(rt.DeploymentId) + if err != nil { + return nil, err + } + environmentID, err := uuid.Parse(rt.EnvironmentId) + if err != nil { + return nil, err + } + resourceID, err := uuid.Parse(rt.ResourceId) + if err != nil { + return nil, err + } + + rows, err := db.GetQueries(ctx). + ListJobsByReleaseTargetWithStatuses(ctx, db.ListJobsByReleaseTargetWithStatusesParams{ + DeploymentID: deploymentID, + EnvironmentID: environmentID, + ResourceID: resourceID, + Statuses: []string{"in_progress", "action_required", "pending"}, + }) + if err != nil { + return nil, err + } + + jobs := make([]*oapi.Job, len(rows)) + for i, row := range rows { + jobs[i] = db.ToOapiJob(db.ListJobsByReleaseIDRow(row)) + } + return jobs, nil +} + +func (g *PostgresGetter) GetDeployment( + ctx context.Context, + deploymentID uuid.UUID, +) (*oapi.Deployment, error) { + row, err := db.GetQueries(ctx).GetDeploymentByID(ctx, deploymentID) + if err != nil { + return nil, err + } + return db.ToOapiDeployment(row), nil +} + +func (g *PostgresGetter) GetEnvironment( + ctx context.Context, + environmentID uuid.UUID, +) (*oapi.Environment, error) { + row, err := db.GetQueries(ctx).GetEnvironmentByID(ctx, environmentID) + if err != nil { + return nil, err + } + return db.ToOapiEnvironment(row), nil +} + +func (g *PostgresGetter) GetResource( + ctx context.Context, + resourceID uuid.UUID, +) (*oapi.Resource, error) { + row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID) + if err != nil { + return nil, err + } + return db.ToOapiResource(row), nil +} + +func (g *PostgresGetter) ListJobAgentsByWorkspaceID( + ctx context.Context, + workspaceID uuid.UUID, +) ([]oapi.JobAgent, error) { + rows, err := db.GetQueries(ctx).ListJobAgentsByWorkspaceID(ctx, workspaceID) + if err != nil { + return nil, err + } + agents := make([]oapi.JobAgent, len(rows)) + for i, row := range rows { + agents[i] = *db.ToOapiJobAgent(row) + } + return agents, nil +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go new file mode 100644 index 000000000..ae2ee6901 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go @@ -0,0 +1,158 @@ +package forcedeploy + +import ( + "context" + "fmt" + "time" + + "github.com/charmbracelet/log" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/selector" + "workspace-engine/pkg/workspace/jobs" +) + +const requeueDelay = 5 * time.Second + +type ReconcileResult struct { + RequeueAfter time.Duration +} + +func Reconcile( + ctx context.Context, + workspaceID string, + getter Getter, + setter Setter, + rt *ReleaseTarget, +) (*ReconcileResult, error) { + ctx, span := tracer.Start(ctx, "forcedeploy.Reconcile") + defer span.End() + + workspaceUUID, err := uuid.Parse(workspaceID) + if err != nil { + return nil, fmt.Errorf("parse workspace id: %w", err) + } + + release, err := getter.GetDesiredRelease(ctx, rt) + if err != nil { + return nil, recordErr(span, "get desired release", err) + } + if release == nil { + log.Info("no desired release for release target, skipping", "rt", rt.ToOAPI().Key()) + return &ReconcileResult{}, nil + } + + activeJobs, err := getter.GetActiveJobsForReleaseTarget(ctx, rt.ToOAPI()) + if err != nil { + return nil, recordErr(span, "get active jobs", err) + } + if len(activeJobs) > 0 { + span.SetAttributes(attribute.Int("active_jobs", len(activeJobs))) + log.Info("release target has active jobs, requeueing", + "rt", rt.ToOAPI().Key(), + "activeJobs", len(activeJobs), + ) + return &ReconcileResult{RequeueAfter: requeueDelay}, nil + } + + if err := buildAndDispatchJob( + ctx, + span, + workspaceUUID, + getter, + setter, + rt, + release, + ); err != nil { + return nil, recordErr(span, "build and dispatch job", err) + } + + return &ReconcileResult{}, nil +} + +func buildAndDispatchJob( + ctx context.Context, + span trace.Span, + workspaceID uuid.UUID, + getter Getter, + setter Setter, + rt *ReleaseTarget, + release *oapi.Release, +) error { + deploymentID, err := uuid.Parse(release.ReleaseTarget.DeploymentId) + if err != nil { + return fmt.Errorf("parse deployment id: %w", err) + } + + deployment, err := getter.GetDeployment(ctx, deploymentID) + if err != nil { + return fmt.Errorf("get deployment: %w", err) + } + + if deployment.JobAgentSelector == "" { + span.AddEvent("no job agent selector configured") + return fmt.Errorf("no job agent selector configured for deployment '%s'", deployment.Name) + } + + resource, err := getter.GetResource(ctx, rt.ResourceID) + if err != nil { + return fmt.Errorf("get resource: %w", err) + } + + allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID) + if err != nil { + return fmt.Errorf("list job agents: %w", err) + } + + matchResult := selector.MatchJobAgentsWithResourceDetailed( + deployment.JobAgentSelector, + allAgents, + resource, + ) + if matchResult.Err != nil { + return fmt.Errorf("match job agents: %w", matchResult.Err) + } + + matchedAgents := matchResult.Result.Matched + span.SetAttributes(attribute.Int("matched_agents", len(matchedAgents))) + + if len(matchedAgents) == 0 { + return fmt.Errorf( + "no job agents matched selector for deployment '%s' (selector=%q)", + deployment.Name, deployment.JobAgentSelector, + ) + } + + factory := jobs.NewFactoryFromGetters(getter) + for i := range matchedAgents { + agent := &matchedAgents[i] + agent.Config = oapi.DeepMergeConfigs( + agent.Config, deployment.JobAgentConfig, release.Version.JobAgentConfig, + ) + + job, err := factory.CreateJobForRelease(ctx, release, agent) + if err != nil { + return fmt.Errorf("create job for agent %s: %w", agent.Name, err) + } + + if err := setter.CreateJobAndEnqueueDispatch( + ctx, + job, + release, + workspaceID.String(), + ); err != nil { + return fmt.Errorf("create and enqueue job: %w", err) + } + } + + return nil +} + +func recordErr(span trace.Span, msg string, err error) error { + span.RecordError(err) + span.SetStatus(codes.Error, msg+" failed") + return fmt.Errorf("%s: %w", msg, err) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go new file mode 100644 index 000000000..815566b93 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go @@ -0,0 +1,318 @@ +package forcedeploy + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/reconcile" +) + +// --------------------------------------------------------------------------- +// Mock Getter +// --------------------------------------------------------------------------- + +type mockGetter struct { + rtExists bool + rtExistsErr error + + release *oapi.Release + releaseErr error + + activeJobs []*oapi.Job + activeJobsErr error + + deployment *oapi.Deployment + deploymentErr error + + environment *oapi.Environment + environmentErr error + + resource *oapi.Resource + resourceErr error + + workspaceAgents []oapi.JobAgent +} + +func (m *mockGetter) ReleaseTargetExists(_ context.Context, _ *ReleaseTarget) (bool, error) { + return m.rtExists, m.rtExistsErr +} + +func (m *mockGetter) GetDesiredRelease(_ context.Context, _ *ReleaseTarget) (*oapi.Release, error) { + return m.release, m.releaseErr +} + +func (m *mockGetter) GetActiveJobsForReleaseTarget( + _ context.Context, + _ *oapi.ReleaseTarget, +) ([]*oapi.Job, error) { + return m.activeJobs, m.activeJobsErr +} + +func (m *mockGetter) GetDeployment(_ context.Context, _ uuid.UUID) (*oapi.Deployment, error) { + return m.deployment, m.deploymentErr +} + +func (m *mockGetter) GetEnvironment(_ context.Context, _ uuid.UUID) (*oapi.Environment, error) { + return m.environment, m.environmentErr +} + +func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource, error) { + return m.resource, m.resourceErr +} + +func (m *mockGetter) ListJobAgentsByWorkspaceID( + _ context.Context, + _ uuid.UUID, +) ([]oapi.JobAgent, error) { + return m.workspaceAgents, nil +} + +var _ Getter = (*mockGetter)(nil) + +// --------------------------------------------------------------------------- +// Mock Setter +// --------------------------------------------------------------------------- + +type enqueueCall struct { + WorkspaceID string + JobID string +} + +type mockSetter struct { + createdJobs []*oapi.Job + createJobErr error + + enqueueCalls []enqueueCall +} + +func (m *mockSetter) CreateJobAndEnqueueDispatch( + _ context.Context, + job *oapi.Job, + _ *oapi.Release, + workspaceID string, +) error { + if m.createJobErr != nil { + return m.createJobErr + } + m.createdJobs = append(m.createdJobs, job) + m.enqueueCalls = append(m.enqueueCalls, enqueueCall{WorkspaceID: workspaceID, JobID: job.Id}) + return nil +} + +var _ Setter = (*mockSetter)(nil) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func testRT() *ReleaseTarget { + return &ReleaseTarget{ + WorkspaceID: uuid.New(), + DeploymentID: uuid.New(), + EnvironmentID: uuid.New(), + ResourceID: uuid.New(), + } +} + +func testRelease(rt *ReleaseTarget) *oapi.Release { + return &oapi.Release{ + Id: uuid.New(), + CreatedAt: time.Now().Format(time.RFC3339), + ReleaseTarget: oapi.ReleaseTarget{ + DeploymentId: rt.DeploymentID.String(), + EnvironmentId: rt.EnvironmentID.String(), + ResourceId: rt.ResourceID.String(), + }, + Variables: map[string]oapi.LiteralValue{}, + EncryptedVariables: []string{}, + Version: oapi.DeploymentVersion{ + Id: uuid.New().String(), + DeploymentId: rt.DeploymentID.String(), + Tag: "v1.0.0", + }, + } +} + +func testDeployment(rt *ReleaseTarget) *oapi.Deployment { + return &oapi.Deployment{ + Id: rt.DeploymentID.String(), + Name: "test-deployment", + Slug: "test-deployment", + Metadata: map[string]string{}, + JobAgentConfig: oapi.JobAgentConfig{}, + JobAgentSelector: "true", + } +} + +func testAgent() oapi.JobAgent { + return oapi.JobAgent{ + Id: uuid.New().String(), + Name: "test-agent", + Type: "test", + Config: oapi.JobAgentConfig{}, + } +} + +func defaultMocks(rt *ReleaseTarget, release *oapi.Release) (*mockGetter, *mockSetter) { + agent := testAgent() + getter := &mockGetter{ + rtExists: true, + release: release, + activeJobs: []*oapi.Job{}, + deployment: testDeployment(rt), + workspaceAgents: []oapi.JobAgent{agent}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "test-resource", + Kind: "TestKind", + Identifier: "test-resource-id", + Metadata: map[string]string{}, + Config: map[string]any{}, + }, + } + setter := &mockSetter{} + return getter, setter +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +func TestReconcile_HappyPath_CreatesJobAndEnqueuesDispatch(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, release.Id.String(), setter.createdJobs[0].ReleaseId) + assert.Equal(t, oapi.JobStatusPending, setter.createdJobs[0].Status) + require.Len(t, setter.enqueueCalls, 1) + assert.Equal(t, rt.WorkspaceID.String(), setter.enqueueCalls[0].WorkspaceID) + assert.Equal(t, setter.createdJobs[0].Id, setter.enqueueCalls[0].JobID) +} + +func TestReconcile_HappyPath_WithCompletedJob(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + // A completed job already exists — should NOT block the redeploy. + // activeJobs only returns processing-state jobs, so completed jobs + // are not included. + getter.activeJobs = []*oapi.Job{} + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + require.Len(t, setter.createdJobs, 1) +} + +func TestReconcile_NoDesiredRelease_Noop(t *testing.T) { + rt := testRT() + getter := &mockGetter{ + rtExists: true, + release: nil, + } + setter := &mockSetter{} + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) + assert.Empty(t, setter.enqueueCalls) +} + +func TestReconcile_ActiveJobExists_Requeues(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusInProgress, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) + assert.Empty(t, setter.enqueueCalls) +} + +func TestReconcile_ActivePendingJob_Requeues(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusPending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) +} + +func TestProcess_ActiveJob_ReturnsRequeueResult(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusInProgress, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + ctrl := NewController(getter, setter) + item := reconcile.Item{ + ID: 1, + WorkspaceID: rt.WorkspaceID.String(), + Kind: "force-deploy", + ScopeType: "release-target", + ScopeID: rt.DeploymentID.String() + ":" + rt.EnvironmentID.String() + ":" + rt.ResourceID.String(), + } + + result, err := ctrl.Process(context.Background(), item) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go b/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go new file mode 100644 index 000000000..40683b95e --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go @@ -0,0 +1,50 @@ +package forcedeploy + +import ( + "fmt" + "strings" + + "github.com/google/uuid" + "workspace-engine/pkg/oapi" +) + +type ReleaseTarget struct { + WorkspaceID uuid.UUID + DeploymentID uuid.UUID + EnvironmentID uuid.UUID + ResourceID uuid.UUID +} + +func NewReleaseTarget(key string) (*ReleaseTarget, error) { + split := strings.SplitN(key, ":", 3) + if len(split) != 3 { + return nil, fmt.Errorf("invalid release target key: %s", key) + } + + deploymentID, err := uuid.Parse(split[0]) + if err != nil { + return nil, fmt.Errorf("invalid deployment id: %s", split[0]) + } + environmentID, err := uuid.Parse(split[1]) + if err != nil { + return nil, fmt.Errorf("invalid environment id: %s", split[1]) + } + resourceID, err := uuid.Parse(split[2]) + if err != nil { + return nil, fmt.Errorf("invalid resource id: %s", split[2]) + } + + return &ReleaseTarget{ + DeploymentID: deploymentID, + EnvironmentID: environmentID, + ResourceID: resourceID, + }, nil +} + +func (rt *ReleaseTarget) ToOAPI() *oapi.ReleaseTarget { + return &oapi.ReleaseTarget{ + DeploymentId: rt.DeploymentID.String(), + EnvironmentId: rt.EnvironmentID.String(), + ResourceId: rt.ResourceID.String(), + } +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go new file mode 100644 index 000000000..ef801d702 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go @@ -0,0 +1,16 @@ +package forcedeploy + +import ( + "context" + + "workspace-engine/pkg/oapi" +) + +type Setter interface { + CreateJobAndEnqueueDispatch( + ctx context.Context, + job *oapi.Job, + release *oapi.Release, + workspaceID string, + ) error +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go new file mode 100644 index 000000000..6222fc59a --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go @@ -0,0 +1,115 @@ +package forcedeploy + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" + "workspace-engine/pkg/db" + "workspace-engine/pkg/oapi" + reconciledb "workspace-engine/pkg/reconcile/postgres/db" +) + +var _ Setter = (*PostgresSetter)(nil) + +type PostgresSetter struct{} + +func (s *PostgresSetter) CreateJobAndEnqueueDispatch( + ctx context.Context, + job *oapi.Job, + release *oapi.Release, + workspaceID string, +) error { + jobID, err := uuid.Parse(job.Id) + if err != nil { + return fmt.Errorf("parse job id: %w", err) + } + + wsID, err := uuid.Parse(workspaceID) + if err != nil { + return fmt.Errorf("parse workspace id: %w", err) + } + + var jobAgentIDParam pgtype.UUID + if job.JobAgentId != "" { + parsed, err := uuid.Parse(job.JobAgentId) + if err != nil { + return fmt.Errorf("parse job agent id: %w", err) + } + jobAgentIDParam = pgtype.UUID{Bytes: parsed, Valid: true} + } + + jobAgentConfig, err := json.Marshal(job.JobAgentConfig) + if err != nil { + return fmt.Errorf("marshal job agent config: %w", err) + } + + dispatchContext, err := json.Marshal(job.DispatchContext) + if err != nil { + return fmt.Errorf("marshal dispatch context: %w", err) + } + + tx, err := db.GetPool(ctx).Begin(ctx) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + q := db.GetQueries(ctx).WithTx(tx) + + var completedAt pgtype.Timestamptz + if job.CompletedAt != nil { + completedAt = pgtype.Timestamptz{Time: *job.CompletedAt, Valid: true} + } + + if err := q.InsertJob(ctx, db.InsertJobParams{ + ID: jobID, + JobAgentID: jobAgentIDParam, + JobAgentConfig: jobAgentConfig, + Status: db.ToDBJobStatus(job.Status), + Message: toPgText(job.Message), + CreatedAt: pgtype.Timestamptz{Time: job.CreatedAt, Valid: !job.CreatedAt.IsZero()}, + UpdatedAt: pgtype.Timestamptz{Time: job.UpdatedAt, Valid: !job.UpdatedAt.IsZero()}, + CompletedAt: completedAt, + DispatchContext: dispatchContext, + }); err != nil { + return fmt.Errorf("insert job: %w", err) + } + + if err := q.InsertReleaseJob(ctx, db.InsertReleaseJobParams{ + ReleaseID: release.Id, + JobID: jobID, + }); err != nil { + return fmt.Errorf("insert release job: %w", err) + } + + now := time.Now() + rq := reconciledb.New(tx) + if err := rq.UpsertReconcileWorkItem(ctx, reconciledb.UpsertReconcileWorkItemParams{ + WorkspaceID: wsID, + Kind: "job-dispatch", + ScopeType: "job", + ScopeID: job.Id, + EventTs: pgtype.Timestamptz{Time: now, Valid: true}, + Priority: 100, + NotBefore: pgtype.Timestamptz{Time: now.Add(-1 * time.Second), Valid: true}, + }); err != nil { + return fmt.Errorf("enqueue job dispatch: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + return nil +} + +func toPgText(s *string) pgtype.Text { + if s == nil { + return pgtype.Text{} + } + return pgtype.Text{String: *s, Valid: true} +} diff --git a/packages/db/src/reconcilers/force-deploy.ts b/packages/db/src/reconcilers/force-deploy.ts new file mode 100644 index 000000000..de1ef5252 --- /dev/null +++ b/packages/db/src/reconcilers/force-deploy.ts @@ -0,0 +1,22 @@ +import type { Tx } from "../common.js"; +import type { ReconcileWorkScope } from "../schema/reconcile.js"; +import { enqueue } from "./enqueue.js"; + +const FORCE_DEPLOY_KIND = "force-deploy"; + +export async function enqueueForceDeploy( + db: Tx, + params: { + workspaceId: string; + deploymentId: string; + environmentId: string; + resourceId: string; + }, +): Promise { + return enqueue(db, { + workspaceId: params.workspaceId, + kind: FORCE_DEPLOY_KIND, + scopeType: "release-target", + scopeId: `${params.deploymentId}:${params.environmentId}:${params.resourceId}`, + }); +} diff --git a/packages/db/src/reconcilers/index.ts b/packages/db/src/reconcilers/index.ts index fc8e79828..6fe919688 100644 --- a/packages/db/src/reconcilers/index.ts +++ b/packages/db/src/reconcilers/index.ts @@ -1,4 +1,5 @@ export * from "./enqueue.js"; +export * from "./force-deploy.js"; export * from "./deployment-plan.js"; export * from "./deployment-selector-eval.js"; export * from "./environment-selector-eval.js"; diff --git a/packages/trpc/src/routes/redeploy.ts b/packages/trpc/src/routes/redeploy.ts index dbc08d712..41b7d456a 100644 --- a/packages/trpc/src/routes/redeploy.ts +++ b/packages/trpc/src/routes/redeploy.ts @@ -1,13 +1,7 @@ import type { Tx } from "@ctrlplane/db"; -import { v4 as uuidv4 } from "uuid"; import { z } from "zod"; -import { and, desc, eq, takeFirstOrNull } from "@ctrlplane/db"; -import { - enqueueDesiredRelease, - enqueueJobDispatch, -} from "@ctrlplane/db/reconcilers"; -import * as schema from "@ctrlplane/db/schema"; +import { enqueueForceDeploy } from "@ctrlplane/db/reconcilers"; import { protectedProcedure, router } from "../trpc.js"; @@ -17,81 +11,17 @@ type ReleaseTarget = { resourceId: string; }; -const getLatestJobForTarget = (db: Tx, releaseTarget: ReleaseTarget) => - db - .select() - .from(schema.job) - .innerJoin(schema.releaseJob, eq(schema.job.id, schema.releaseJob.jobId)) - .innerJoin( - schema.release, - eq(schema.releaseJob.releaseId, schema.release.id), - ) - .where( - and( - eq(schema.release.deploymentId, releaseTarget.deploymentId), - eq(schema.release.environmentId, releaseTarget.environmentId), - eq(schema.release.resourceId, releaseTarget.resourceId), - ), - ) - .orderBy(desc(schema.job.createdAt)) - .limit(1) - .then(takeFirstOrNull) - .then((result) => - result - ? { job: result.job, releaseId: result.release_job.releaseId } - : null, - ); - -const getJobMetadata = (db: Tx, jobId: string) => - db - .select() - .from(schema.jobMetadata) - .where(eq(schema.jobMetadata.jobId, jobId)) - .then((rows) => - Object.fromEntries(rows.map((row) => [row.key, row.value])), - ); - -const redeployReleaseTarget = async ( +const redeployReleaseTarget = ( db: Tx, workspaceId: string, releaseTarget: ReleaseTarget, -) => { - const result = await getLatestJobForTarget(db, releaseTarget); - if (result == null) - return enqueueDesiredRelease(db, { - workspaceId, - deploymentId: releaseTarget.deploymentId, - environmentId: releaseTarget.environmentId, - resourceId: releaseTarget.resourceId, - }); - - const { job, releaseId } = result; - const metadata = await getJobMetadata(db, job.id); - - const newJob = { - id: uuidv4(), - status: "pending" as const, - createdAt: new Date(), - updatedAt: new Date(), - jobAgentId: job.jobAgentId, - jobAgentConfig: job.jobAgentConfig, - dispatchContext: job.dispatchContext, - reason: "redeploy" as const, - metadata, - }; - - await db.transaction(async (tx) => { - await tx.insert(schema.job).values(newJob); - await tx.insert(schema.releaseJob).values({ - releaseId, - jobId: newJob.id, - }); - await enqueueJobDispatch(tx, { - workspaceId, - jobId: newJob.id, - }); +) => + enqueueForceDeploy(db, { + workspaceId, + deploymentId: releaseTarget.deploymentId, + environmentId: releaseTarget.environmentId, + resourceId: releaseTarget.resourceId, }); -}; export const redeployRouter = router({ releaseTarget: protectedProcedure