diff --git a/sandbox-sidecar/src/jobs/jobRunner.ts b/sandbox-sidecar/src/jobs/jobRunner.ts index 6197f9b6a..6ba1f5b49 100644 --- a/sandbox-sidecar/src/jobs/jobRunner.ts +++ b/sandbox-sidecar/src/jobs/jobRunner.ts @@ -23,7 +23,7 @@ export class JobRunner { logger.info({ job: job.id, runner: this.runner.name }, "sandbox job started"); try { - const result = await this.runner.run(job); + const result = await this.runner.run(job, (chunk) => this.store.appendLogs(job.id, chunk)); this.store.updateStatus(job.id, "succeeded", result.logs); this.store.setResult(job.id, result.result); logger.info({ job: job.id }, "sandbox job succeeded"); @@ -36,4 +36,3 @@ export class JobRunner { } } } - diff --git a/sandbox-sidecar/src/jobs/jobStore.ts b/sandbox-sidecar/src/jobs/jobStore.ts index 810146389..8384d013b 100644 --- a/sandbox-sidecar/src/jobs/jobStore.ts +++ b/sandbox-sidecar/src/jobs/jobStore.ts @@ -39,6 +39,13 @@ export class JobStore { job.updatedAt = new Date(); } + appendLogs(id: string, chunk: string) { + const job = this.jobs.get(id); + if (!job || !chunk) return; + job.logs = `${job.logs}${chunk}`; + job.updatedAt = new Date(); + } + setResult(id: string, result: SandboxRunResult | undefined) { const job = this.jobs.get(id); if (!job) return; @@ -46,4 +53,3 @@ export class JobStore { job.updatedAt = new Date(); } } - diff --git a/sandbox-sidecar/src/runners/e2bRunner.ts b/sandbox-sidecar/src/runners/e2bRunner.ts index 3b7d30ae4..84a6323a0 100644 --- a/sandbox-sidecar/src/runners/e2bRunner.ts +++ b/sandbox-sidecar/src/runners/e2bRunner.ts @@ -23,14 +23,14 @@ export class E2BSandboxRunner implements SandboxRunner { } } - async run(job: SandboxRunRecord): Promise { + async run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise { if (job.payload.operation === "plan") { - return this.runPlan(job); + return this.runPlan(job, appendLog); } - return this.runApply(job); + return this.runApply(job, appendLog); } - private async runPlan(job: SandboxRunRecord): Promise { + private async runPlan(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise { const requestedVersion = job.payload.terraformVersion || "1.5.7"; const requestedEngine = job.payload.engine || "terraform"; const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine); @@ -42,6 +42,10 @@ export class E2BSandboxRunner implements SandboxRunner { const workDir = await this.setupWorkspace(sandbox, job); const logs: string[] = []; + const streamLog = (chunk: string) => { + if (!chunk) return; + appendLog?.(chunk); + }; // Run terraform init await this.runTerraformCommand( @@ -49,6 +53,7 @@ export class E2BSandboxRunner implements SandboxRunner { workDir, ["init", "-input=false", "-no-color"], logs, + streamLog, ); // Run terraform plan @@ -56,14 +61,13 @@ export class E2BSandboxRunner implements SandboxRunner { if (job.payload.isDestroy) { planArgs.splice(1, 0, "-destroy"); } - await this.runTerraformCommand(sandbox, workDir, planArgs, logs); + await this.runTerraformCommand(sandbox, workDir, planArgs, logs, streamLog); // Get plan JSON const showResult = await this.runTerraformCommand( sandbox, workDir, ["show", "-json", "tfplan.binary"], - logs, ); const planJSON = showResult.stdout; @@ -76,24 +80,28 @@ export class E2BSandboxRunner implements SandboxRunner { planJSON: Buffer.from(planJSON, "utf8").toString("base64"), }; - return { logs: logs.join("\n"), result }; + return { logs: logs.join(""), result }; } finally { await sandbox.kill(); } } - private async runApply(job: SandboxRunRecord): Promise { + private async runApply(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise { const requestedVersion = job.payload.terraformVersion || "1.5.7"; - const requestedEngine = job.payload.engine || "terraform"; - const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine); - try { - // Install IaC tool if using fallback template - if (needsInstall) { - await this.installIacTool(sandbox, requestedEngine, requestedVersion); - } - - const workDir = await this.setupWorkspace(sandbox, job); - const logs: string[] = []; + const requestedEngine = job.payload.engine || "terraform"; + const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine); + try { + // Install IaC tool if using fallback template + if (needsInstall) { + await this.installIacTool(sandbox, requestedEngine, requestedVersion); + } + + const workDir = await this.setupWorkspace(sandbox, job); + const logs: string[] = []; + const streamLog = (chunk: string) => { + if (!chunk) return; + appendLog?.(chunk); + }; // Run terraform init await this.runTerraformCommand( @@ -101,6 +109,7 @@ export class E2BSandboxRunner implements SandboxRunner { workDir, ["init", "-input=false", "-no-color"], logs, + streamLog, ); // Run terraform apply/destroy @@ -110,6 +119,7 @@ export class E2BSandboxRunner implements SandboxRunner { workDir, [applyCommand, "-auto-approve", "-input=false", "-no-color"], logs, + streamLog, ); // Read the state file @@ -119,7 +129,7 @@ export class E2BSandboxRunner implements SandboxRunner { state: Buffer.from(stateContent, "utf8").toString("base64"), }; - return { logs: logs.join("\n"), result }; + return { logs: logs.join(""), result }; } finally { await sandbox.kill(); } @@ -262,28 +272,41 @@ export class E2BSandboxRunner implements SandboxRunner { cwd: string, args: string[], logBuffer?: string[], + appendLog?: (chunk: string) => void, ): Promise<{ stdout: string; stderr: string }> { const engine = (sandbox as any)._requestedEngine || "terraform"; const binaryName = engine === "tofu" ? "tofu" : "terraform"; const cmdStr = `${binaryName} ${args.join(" ")}`; logger.info({ cmd: cmdStr, cwd, engine }, "running IaC command in E2B sandbox"); + let sawStream = false; + const pipeChunk = (chunk: string | undefined) => { + if (!chunk) return; + sawStream = true; + if (logBuffer) { + logBuffer.push(chunk); + } + appendLog?.(chunk); + }; + const result = await sandbox.commands.run(cmdStr, { cwd, envs: { TF_IN_AUTOMATION: "1", }, + onStdout: pipeChunk, + onStderr: pipeChunk, }); const stdout = result.stdout; const stderr = result.stderr; const exitCode = result.exitCode; + // Push any remaining buffered output for completeness in final log const mergedLogs = `${stdout}\n${stderr}`.trim(); - if (logBuffer && mergedLogs.length > 0) { - logBuffer.push(mergedLogs); + if (!sawStream && mergedLogs.length > 0) { + pipeChunk(mergedLogs + "\n"); } - if (exitCode !== 0) { throw new Error( `${binaryName} ${args[0]} exited with code ${exitCode}\n${mergedLogs}`, @@ -330,4 +353,3 @@ export class E2BSandboxRunner implements SandboxRunner { } } } - diff --git a/sandbox-sidecar/src/runners/types.ts b/sandbox-sidecar/src/runners/types.ts index a9d2d1f69..f610af5ad 100644 --- a/sandbox-sidecar/src/runners/types.ts +++ b/sandbox-sidecar/src/runners/types.ts @@ -7,6 +7,5 @@ export interface RunnerOutput { export interface SandboxRunner { readonly name: string; - run(job: SandboxRunRecord): Promise; + run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise; } - diff --git a/taco/internal/sandbox/e2b.go b/taco/internal/sandbox/e2b.go index 032b71c84..a77e70cd1 100644 --- a/taco/internal/sandbox/e2b.go +++ b/taco/internal/sandbox/e2b.go @@ -48,7 +48,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe if req == nil { return nil, fmt.Errorf("plan request cannot be nil") } - + // Validate engine field is set if req.Engine == "" { return nil, fmt.Errorf("engine field is required but was empty") @@ -76,7 +76,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe return nil, err } - status, err := s.waitForCompletion(ctx, jobID) + status, err := s.waitForCompletion(ctx, jobID, req.LogSink) if err != nil { return nil, err } @@ -109,7 +109,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl if req == nil { return nil, fmt.Errorf("apply request cannot be nil") } - + // Validate engine field is set if req.Engine == "" { return nil, fmt.Errorf("engine field is required but was empty") @@ -137,7 +137,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl return nil, err } - status, err := s.waitForCompletion(ctx, jobID) + status, err := s.waitForCompletion(ctx, jobID, req.LogSink) if err != nil { return nil, err } @@ -167,7 +167,7 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin // Retry logic for transient failures (network issues, sidecar temporarily unavailable) maxRetries := 3 var lastErr error - + for attempt := 1; attempt <= maxRetries; attempt++ { if attempt > 1 { // Exponential backoff: 1s, 2s, 4s @@ -214,11 +214,11 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin } return startResp.ID, nil } - + return "", fmt.Errorf("failed to start sandbox run after %d attempts: %w", maxRetries, lastErr) } -func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string) (*e2bRunStatusResponse, error) { +func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string, onLog func(string)) (*e2bRunStatusResponse, error) { ctx, cancel := context.WithTimeout(ctx, s.cfg.PollTimeout) defer cancel() @@ -226,10 +226,18 @@ func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string) (*e2bR defer ticker.Stop() var lastErr error + lastLen := 0 for { status, err := s.fetchStatus(ctx, runID) if err == nil { + // Stream incremental log chunks as we see new bytes + if onLog != nil && len(status.Logs) > lastLen { + chunk := status.Logs[lastLen:] + onLog(chunk) + lastLen = len(status.Logs) + } + switch strings.ToLower(status.Status) { case "succeeded", "completed", "done": return status, nil diff --git a/taco/internal/sandbox/types.go b/taco/internal/sandbox/types.go index 848d6d9dc..23c026c77 100644 --- a/taco/internal/sandbox/types.go +++ b/taco/internal/sandbox/types.go @@ -16,6 +16,9 @@ type PlanRequest struct { ConfigArchive []byte State []byte Metadata map[string]string + // LogSink is an optional callback that receives incremental log chunks + // as they are observed while polling the sandbox run. + LogSink func(chunk string) } // PlanResult captures the outcome of a sandboxed plan execution. @@ -43,6 +46,9 @@ type ApplyRequest struct { ConfigArchive []byte State []byte Metadata map[string]string + // LogSink is an optional callback that receives incremental log chunks + // as they are observed while polling the sandbox run. + LogSink func(chunk string) } // ApplyResult captures the outcome of a sandboxed apply. diff --git a/taco/internal/tfe/apply.go b/taco/internal/tfe/apply.go index dc9968f2f..96034773e 100644 --- a/taco/internal/tfe/apply.go +++ b/taco/internal/tfe/apply.go @@ -100,49 +100,25 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error { return c.JSON(http.StatusNotFound, map[string]string{"error": "apply not found"}) } - // Try to get apply logs from blob storage - var logText string - applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID) - - logData, err := h.blobStore.DownloadBlob(ctx, applyLogBlobID) - if err == nil { - logText = string(logData) - } else { - // If logs don't exist yet, return placeholder - if run.Status == "applying" || run.Status == "apply_queued" { - logText = "Waiting for apply to start...\n" - } else { - logText = "Apply logs not available\n" - } - } - - // Handle offset for streaming with proper byte accounting - // Stream format: [STX at offset 0][logText at offset 1+][ETX at offset 1+len(logText)] - var responseData []byte - - if offsetInt == 0 { - // First request: send STX + current logs - responseData = append([]byte{0x02}, []byte(logText)...) - fmt.Printf("📤 APPLY LOGS at offset=0: STX + %d bytes of log text\n", len(logText)) - } else { - // Client already received STX (1 byte at offset 0) - // Map stream offset to logText offset: streamOffset=1 → logText[0] - logOffset := offsetInt - 1 - - if logOffset < int64(len(logText)) { - // Send remaining log text - responseData = []byte(logText[logOffset:]) - fmt.Printf("📤 APPLY LOGS at offset=%d: sending %d bytes (logText[%d:])\n", - offsetInt, len(responseData), logOffset) - } else if logOffset == int64(len(logText)) && run.Status == "applied" { - // All logs sent, send ETX - responseData = []byte{0x03} - fmt.Printf("📤 Sending ETX (End of Text) for apply %s - logs complete\n", applyID) - } else { - // Waiting for more logs or already sent ETX - responseData = []byte{} - fmt.Printf("📤 APPLY LOGS at offset=%d: no new data (waiting or complete)\n", offsetInt) - } + responseData, err := streamChunkedLogs(ctx, h.blobStore, logStreamOptions{ + Prefix: "applies", + Label: "APPLY", + ID: run.ID, + Offset: offsetInt, + ChunkSize: 2 * 1024, + GenerateDefaultText: func() string { + if run.Status == "applying" || run.Status == "apply_queued" { + return "Waiting for apply to start...\n" + } + return "Apply logs not available\n" + }, + IsComplete: func() bool { + return run.Status == "applied" || run.Status == "errored" + }, + AppendETXOnFirst: true, // If already complete on first request, send ETX immediately + }) + if err != nil { + return err } c.Response().Header().Set("Content-Type", "text/plain") diff --git a/taco/internal/tfe/apply_executor.go b/taco/internal/tfe/apply_executor.go index 222b24d44..3f7d47c77 100644 --- a/taco/internal/tfe/apply_executor.go +++ b/taco/internal/tfe/apply_executor.go @@ -9,6 +9,7 @@ import ( "os/exec" "path/filepath" "strconv" + "sync" "time" "github.com/diggerhq/digger/opentaco/internal/domain" @@ -146,6 +147,81 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { } }() + // Chunked logging to prevent memory bloat + // Upload log chunks as separate S3 objects and clear buffer after each upload + // Fixed-size 2KB chunks enable offset-based chunk selection (reduces S3 re-downloads) + const chunkSize = 2 * 1024 // 2KB fixed size + chunkIndex := 1 + var logBuffer bytes.Buffer + var logMutex sync.Mutex + lastLogFlush := time.Now() + + // Flush helper - uploads current buffer as a padded 2KB chunk and clears it + flushLogs := func() error { + logMutex.Lock() + if logBuffer.Len() == 0 { + logMutex.Unlock() + return nil + } + + // Extract at most chunkSize bytes (2KB) + dataLen := logBuffer.Len() + if dataLen > chunkSize { + dataLen = chunkSize + } + data := make([]byte, dataLen) + copy(data, logBuffer.Bytes()[:dataLen]) + currentChunk := chunkIndex + chunkIndex++ // Increment NOW before unlock to reserve this chunk number atomically + + // Copy remainder BEFORE resetting (crucial - remainder slice points to internal buffer) + var remainderCopy []byte + if logBuffer.Len() > dataLen { + remainder := logBuffer.Bytes()[dataLen:] + remainderCopy = make([]byte, len(remainder)) + copy(remainderCopy, remainder) + } + + // Now safe to reset and write remainder back + logBuffer.Reset() + if len(remainderCopy) > 0 { + logBuffer.Write(remainderCopy) + } + logMutex.Unlock() + + // Pad to fixed 2KB size (rest will be null bytes) + paddedData := make([]byte, chunkSize) + copy(paddedData, data) + + // Upload this chunk (key includes zero-padded chunk index) + chunkKey := fmt.Sprintf("applies/%s/chunks/%08d.log", run.ID, currentChunk) + err := e.blobStore.UploadBlob(ctx, chunkKey, paddedData) + + if err == nil { + logMutex.Lock() + lastLogFlush = time.Now() + logMutex.Unlock() + } + return err + } + + appendLog := func(message string) { + logMutex.Lock() + logBuffer.WriteString(message) + now := time.Now() + // Flush if buffer exceeds 2KB or 1s has passed + shouldFlush := logBuffer.Len() > chunkSize || now.Sub(lastLogFlush) > 1*time.Second + logMutex.Unlock() + + if shouldFlush { + _ = flushLogs() + } + } + + defer func() { + _ = flushLogs() + }() + // Update run status to "applying" if err := e.runRepo.UpdateRunStatus(ctx, runID, "applying"); err != nil { logger.Error("failed to update run status", slog.String("error", err.Error())) @@ -154,6 +230,9 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { logger.Info("updated run status to applying") + appendLog("Starting terraform apply...\n") + appendLog("Downloading configuration...\n") + // Get configuration version configVer, err := e.configVerRepo.GetConfigurationVersion(ctx, run.ConfigurationVersionID) if err != nil { @@ -167,6 +246,8 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { return e.handleApplyError(ctx, run.ID, logger, fmt.Sprintf("Failed to download archive: %v", err)) } + appendLog("Extracting workspace...\n") + // Extract to temp directory workDir, err := extractArchive(archiveData) if err != nil { @@ -222,6 +303,10 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { ) if useSandbox { + appendLog("Starting remote execution environment...\n") + appendLog("Initializing terraform...\n") + appendLog("Running terraform apply...\n") + if applyActivityID != "" && e.activityRepo != nil { applyActivityStart = time.Now() if err := e.activityRepo.MarkRunning(ctx, applyActivityID, applyActivityStart, e.sandbox.Name()); err != nil { @@ -229,7 +314,25 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { } } - result, execErr := e.executeApplyInSandbox(ctx, run, unitMeta, workspaceArchive, stateData) + // Start heartbeat goroutine for long-running remote applies + heartbeatDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + appendLog(fmt.Sprintf("Remote apply in progress... (%s)\n", time.Now().Format("15:04:05"))) + case <-heartbeatDone: + return + case <-ctx.Done(): + return + } + } + }() + defer close(heartbeatDone) + + result, execErr := e.executeApplyInSandbox(ctx, run, unitMeta, workspaceArchive, stateData, appendLog) applySandboxResult = result applyErr = execErr if result != nil { @@ -253,10 +356,16 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { } } - // Store apply logs in blob storage (use UploadBlob - no lock checks needed for logs) - applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID) - if storeErr := e.blobStore.UploadBlob(ctx, applyLogBlobID, []byte(logs)); storeErr != nil { - logger.Error("failed to store apply logs", slog.String("error", storeErr.Error())) + // Append the actual terraform output to the progress logs + if !useSandbox { + appendLog("\n" + logs) + } + + // Store final status + if applyErr != nil { + appendLog("\n\nApply failed\n") + } else { + appendLog("\n\nApply complete\n") } // Update run status @@ -264,7 +373,7 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { if applyErr != nil { runStatus = "errored" logs = logs + "\n\nError: " + applyErr.Error() - _ = e.blobStore.UploadBlob(ctx, applyLogBlobID, []byte(logs)) + // Error already logged via appendLog in the executor if updateErr := e.runRepo.UpdateRunError(ctx, run.ID, applyErr.Error()); updateErr != nil { logger.Error("failed to update run error", slog.String("error", updateErr.Error())) } @@ -281,7 +390,7 @@ func (e *ApplyExecutor) ExecuteApply(ctx context.Context, runID string) error { runStatus = "errored" errMsg := fmt.Sprintf("Failed to upload state: %v", uploadErr) logs = logs + "\n\nCritical Error: " + errMsg + "\n" - _ = e.blobStore.UploadBlob(ctx, applyLogBlobID, []byte(logs)) + // Error already logged via appendLog in the executor if updateErr := e.runRepo.UpdateRunError(ctx, run.ID, errMsg); updateErr != nil { logger.Error("failed to update run error", slog.String("error", updateErr.Error())) } @@ -393,7 +502,7 @@ func (e *ApplyExecutor) handleApplyError(ctx context.Context, runID string, logg return fmt.Errorf("apply execution failed: %s", errorMsg) } -func (e *ApplyExecutor) executeApplyInSandbox(ctx context.Context, run *domain.TFERun, unit *storage.UnitMetadata, archive []byte, stateData []byte) (*sandbox.ApplyResult, error) { +func (e *ApplyExecutor) executeApplyInSandbox(ctx context.Context, run *domain.TFERun, unit *storage.UnitMetadata, archive []byte, stateData []byte, logSink func(string)) (*sandbox.ApplyResult, error) { if e.sandbox == nil { return nil, fmt.Errorf("sandbox provider not configured") } @@ -424,6 +533,7 @@ func (e *ApplyExecutor) executeApplyInSandbox(ctx context.Context, run *domain.T ConfigArchive: archive, State: stateData, Metadata: metadata, + LogSink: logSink, } return e.sandbox.ExecuteApply(ctx, req) } diff --git a/taco/internal/tfe/configuration-versions.go b/taco/internal/tfe/configuration-versions.go index dfbf17fbc..e051e3dba 100644 --- a/taco/internal/tfe/configuration-versions.go +++ b/taco/internal/tfe/configuration-versions.go @@ -67,7 +67,7 @@ func (h *TfeHandler) CreateConfigurationVersions(c echo.Context) error { // Get org and user context orgIdentifier, _ := c.Get("organization_id").(string) userID, _ := c.Get("user_id").(string) - + if orgIdentifier == "" { orgIdentifier = "default-org" } @@ -102,10 +102,10 @@ func (h *TfeHandler) CreateConfigurationVersions(c echo.Context) error { } `json:"attributes"` } `json:"data"` } - - speculative := false // Default to false (normal apply) + + speculative := false // Default to false (normal apply) autoQueueRuns := false - + // Manually decode JSON since content-type is application/vnd.api+json if err := json.NewDecoder(c.Request().Body).Decode(&requestPayload); err == nil { if requestPayload.Data.Attributes.Speculative != nil { @@ -128,8 +128,8 @@ func (h *TfeHandler) CreateConfigurationVersions(c echo.Context) error { UnitID: unitID, Status: "pending", Source: "cli", - Speculative: speculative, // Parse from CLI request - AutoQueueRuns: autoQueueRuns, // Parse from CLI request + Speculative: speculative, // Parse from CLI request + AutoQueueRuns: autoQueueRuns, // Parse from CLI request Provisional: false, StatusTimestamps: "{}", CreatedBy: userID, @@ -153,7 +153,7 @@ func (h *TfeHandler) CreateConfigurationVersions(c echo.Context) error { if err != nil { return err } - + fmt.Printf("DEBUG Generated upload URL: %s\n", signedUploadUrl) cv := tfe.ConfigurationVersionRecord{ @@ -180,7 +180,6 @@ func (h *TfeHandler) CreateConfigurationVersions(c echo.Context) error { return nil } - func (h *TfeHandler) UploadConfigurationArchive(c echo.Context) error { ctx := c.Request().Context() configVersionID := c.Param("id") @@ -220,4 +219,4 @@ func (h *TfeHandler) UploadConfigurationArchive(c echo.Context) error { // 200 OK, empty body. Terraform does not expect JSON here. return c.NoContent(http.StatusOK) -} \ No newline at end of file +} diff --git a/taco/internal/tfe/log_stream.go b/taco/internal/tfe/log_stream.go new file mode 100644 index 000000000..6b0122ed8 --- /dev/null +++ b/taco/internal/tfe/log_stream.go @@ -0,0 +1,101 @@ +package tfe + +import ( + "context" + "fmt" + "strings" + + "github.com/diggerhq/digger/opentaco/internal/storage" +) + +// logStreamOptions captures the parameters needed to stream chunked logs with STX/ETX framing. +type logStreamOptions struct { + Prefix string // e.g., "plans" or "applies" + Label string // e.g., "PLAN" or "APPLY" for logging + ID string // planID or apply/runID (part of the S3 key) + Offset int64 // client-requested offset (includes STX at 0) + ChunkSize int // fixed chunk size in bytes (must match writer) + GenerateDefaultText func() string // default text on first request when no chunks + IsComplete func() bool // signals whether execution is finished/errored + AppendETXOnFirst bool // if true, append ETX on the first request when complete +} + +// streamChunkedLogs reads fixed-size padded chunks from blob storage and returns the bytes to send. +func streamChunkedLogs(ctx context.Context, blobStore storage.UnitStore, opts logStreamOptions) ([]byte, error) { + chunkSize := opts.ChunkSize + if chunkSize == 0 { + chunkSize = 2 * 1024 + } + + startChunk := 1 + if opts.Offset > 1 { // offset includes STX byte at 0 + logOffset := opts.Offset - 1 + startChunk = int(logOffset / int64(chunkSize)) + startChunk++ // 1-indexed chunk numbers + } + bytesBefore := int64(chunkSize * (startChunk - 1)) + + var fullLogs strings.Builder + chunkIndex := startChunk + for { + chunkKey := fmt.Sprintf("%s/%s/chunks/%08d.log", opts.Prefix, opts.ID, chunkIndex) + logData, err := blobStore.DownloadBlob(ctx, chunkKey) + if err != nil { + // Missing chunk; only continue if not complete (more chunks may appear later) + if opts.IsComplete != nil && opts.IsComplete() { + break + } + break + } + fullLogs.Write(logData) + chunkIndex++ + } + + logText := fullLogs.String() + // Trim padding after assembling the requested window + logText = strings.TrimRight(logText, "\x00") + + if logText == "" && opts.Offset == 0 && opts.GenerateDefaultText != nil { + logText = opts.GenerateDefaultText() + } + + var responseData []byte + + if opts.Offset == 0 { + // First request: send STX + current logs + responseData = append([]byte{0x02}, []byte(logText)...) + fmt.Printf("📤 %s LOGS at offset=0: STX + %d bytes of log text\n", opts.Label, len(logText)) + if len(logText) > 0 { + fmt.Printf("Log preview (first 200 chars): %.200s\n", logText) + } + if opts.AppendETXOnFirst && opts.IsComplete != nil && opts.IsComplete() { + responseData = append(responseData, 0x03) + fmt.Printf("📤 Sending ETX for %s %s (complete at first request)\n", opts.Label, opts.ID) + } + } else { + // Map stream offset to logText offset: + // - stream offset 0 = STX + // - stream offset 1 = first byte of full logs + logOffset := opts.Offset - 1 - bytesBefore + if logOffset < 0 { + logOffset = 0 + } + + if logOffset < int64(len(logText)) { + // Send remaining log text + responseData = []byte(logText[logOffset:]) + fmt.Printf("📤 %s LOGS at offset=%d: sending %d bytes (logText[%d:])\n", + opts.Label, opts.Offset, len(responseData), logOffset) + } else if opts.IsComplete != nil && opts.IsComplete() { + // All logs sent, send ETX to stop polling + responseData = []byte{0x03} + fmt.Printf("📤 Sending ETX (End of Text) for %s %s - logs complete\n", opts.Label, opts.ID) + } else { + // Waiting for more logs + responseData = []byte{} + fmt.Printf("📤 %s LOGS at offset=%d: no new data (waiting or complete)\n", opts.Label, opts.Offset) + } + } + + return responseData, nil +} diff --git a/taco/internal/tfe/plan.go b/taco/internal/tfe/plan.go index 635addfb9..51829ba88 100644 --- a/taco/internal/tfe/plan.go +++ b/taco/internal/tfe/plan.go @@ -63,7 +63,7 @@ func (h *TfeHandler) GetPlan(c echo.Context) error { ID: plan.RunID, }, } - + // Only include resource counts when plan is finished // If we send HasChanges:false before the plan completes, Terraform CLI // will think there's nothing to apply and won't prompt for confirmation! @@ -103,54 +103,22 @@ func (h *TfeHandler) GetPlanLogs(c echo.Context) error { if err != nil { return c.JSON(http.StatusNotFound, map[string]string{"error": "plan not found"}) } - - // Check if logs exist in blob storage - var logText string - if plan.LogBlobID != nil { - // Try to get logs from blob storage - logData, err := h.blobStore.DownloadBlob(ctx, *plan.LogBlobID) - if err != nil { - fmt.Printf("Failed to get logs from blob storage: %v\n", err) - // Fall back to default logs - logText = generateDefaultPlanLogs(plan) - } else { - logText = string(logData) - } - } else { - // Generate default logs based on plan status - logText = generateDefaultPlanLogs(plan) - } - - // Handle offset for streaming with proper byte accounting - // Stream format: [STX at offset 0][logText at offset 1+][ETX at offset 1+len(logText)] - var responseData []byte - - if offsetInt == 0 { - // First request: send STX + current logs - responseData = append([]byte{0x02}, []byte(logText)...) - fmt.Printf("📤 PLAN LOGS at offset=0: STX + %d bytes of log text\n", len(logText)) - if len(logText) > 0 { - fmt.Printf("Log preview (first 200 chars): %.200s\n", logText) - } - } else { - // Client already received STX (1 byte at offset 0) - // Map stream offset to logText offset: streamOffset=1 → logText[0] - logOffset := offsetInt - 1 - - if logOffset < int64(len(logText)) { - // Send remaining log text - responseData = []byte(logText[logOffset:]) - fmt.Printf("📤 PLAN LOGS at offset=%d: sending %d bytes (logText[%d:])\n", - offsetInt, len(responseData), logOffset) - } else if logOffset == int64(len(logText)) && plan.Status == "finished" { - // All logs sent, send ETX - responseData = []byte{0x03} - fmt.Printf("📤 Sending ETX (End of Text) for plan %s - logs complete\n", planID) - } else { - // Waiting for more logs or already sent ETX - responseData = []byte{} - fmt.Printf("📤 PLAN LOGS at offset=%d: no new data (waiting or complete)\n", offsetInt) - } + responseData, err := streamChunkedLogs(ctx, h.blobStore, logStreamOptions{ + Prefix: "plans", + Label: "PLAN", + ID: planID, + Offset: offsetInt, + ChunkSize: 2 * 1024, + GenerateDefaultText: func() string { + return generateDefaultPlanLogs(plan) + }, + IsComplete: func() bool { + return plan.Status == "finished" || plan.Status == "errored" + }, + AppendETXOnFirst: false, + }) + if err != nil { + return err } c.Response().Header().Set(echo.HeaderContentType, "text/plain") @@ -191,7 +159,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error { // Create dummy resource changes based on our counts // The CLI checks if this array has entries to decide whether to prompt resourceChanges := make([]interface{}, 0) - + // Add placeholder entries for additions for i := 0; i < plan.ResourceAdditions; i++ { resourceChanges = append(resourceChanges, map[string]interface{}{ @@ -200,7 +168,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error { }, }) } - + // Add placeholder entries for changes for i := 0; i < plan.ResourceChanges; i++ { resourceChanges = append(resourceChanges, map[string]interface{}{ @@ -209,7 +177,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error { }, }) } - + // Add placeholder entries for destructions for i := 0; i < plan.ResourceDestructions; i++ { resourceChanges = append(resourceChanges, map[string]interface{}{ @@ -218,7 +186,7 @@ func (h *TfeHandler) GetPlanJSONOutput(c echo.Context) error { }, }) } - + jsonPlan["resource_changes"] = resourceChanges } @@ -231,7 +199,7 @@ func generateDefaultPlanLogs(plan *domain.TFEPlan) string { // Don't show resource counts in logs until plan is finished // Terraform CLI parses the logs to determine if changes exist! if plan.Status == "finished" { - return fmt.Sprintf(`Terraform used the selected providers to generate the following execution plan. + return fmt.Sprintf(`Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols: + create - destroy @@ -245,4 +213,3 @@ Plan: %d to add, %d to change, %d to destroy. // The CLI will keep polling until it gets real content. return "" } - diff --git a/taco/internal/tfe/plan_executor.go b/taco/internal/tfe/plan_executor.go index 310151cec..d6cd5ae6e 100644 --- a/taco/internal/tfe/plan_executor.go +++ b/taco/internal/tfe/plan_executor.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/diggerhq/digger/opentaco/internal/domain" @@ -59,7 +60,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { slog.String("operation", "execute_plan"), slog.String("run_id", runID), ) - + logger.Info("starting plan execution") // Get run @@ -68,7 +69,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { logger.Error("failed to get run", slog.String("error", err.Error())) return fmt.Errorf("failed to get run: %w", err) } - logger.Info("retrieved run", + logger.Info("retrieved run", slog.String("config_version_id", run.ConfigurationVersionID), slog.String("unit_id", run.UnitID)) @@ -99,7 +100,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { var planActivityID string var planActivityStart time.Time var planSandboxResult *sandbox.PlanResult - + if useSandbox { logger.Info("✅ PLAN EXECUTOR: Remote execution path selected", slog.String("unit_id", run.UnitID), @@ -108,16 +109,16 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { logger.Info("â„šī¸ PLAN EXECUTOR: Local execution path selected", slog.String("unit_id", run.UnitID)) } - + if useSandbox && e.activityRepo != nil { activity := &domain.RemoteRunActivity{ - RunID: run.ID, - OrgID: run.OrgID, - UnitID: run.UnitID, - Operation: "plan", - Status: "pending", - TriggeredBy: run.CreatedBy, - TriggeredSource: run.Source, + RunID: run.ID, + OrgID: run.OrgID, + UnitID: run.UnitID, + Operation: "plan", + Status: "pending", + TriggeredBy: run.CreatedBy, + TriggeredSource: run.Source, } if id, err := e.activityRepo.CreateActivity(ctx, activity); err != nil { @@ -141,18 +142,18 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { Version: "1.0.0", Created: time.Now(), } - - logger.Info("acquiring unit lock", + + logger.Info("acquiring unit lock", slog.String("unit_id", run.UnitID), slog.String("lock_id", lockInfo.ID)) - + if err := e.unitRepo.Lock(ctx, run.UnitID, lockInfo); err != nil { if err == storage.ErrLockConflict { // Unit is locked by another operation currentLock, _ := e.unitRepo.GetLock(ctx, run.UnitID) - errMsg := fmt.Sprintf("Unit is locked by another operation (locked by: %s). Please wait and try again.", + errMsg := fmt.Sprintf("Unit is locked by another operation (locked by: %s). Please wait and try again.", currentLock.Who) - logger.Warn("lock conflict - unit already locked", + logger.Warn("lock conflict - unit already locked", slog.String("unit_id", run.UnitID), slog.String("locked_by", currentLock.Who), slog.String("lock_id", currentLock.ID)) @@ -161,12 +162,12 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { logger.Error("failed to acquire lock", slog.String("error", err.Error())) return e.handlePlanError(ctx, run.ID, run.PlanID, logger, fmt.Sprintf("Failed to acquire lock: %v", err)) } - + logger.Info("unit lock acquired successfully") - + // Track whether lock has been manually released (to avoid double-unlock in defer) lockReleased := false - + // Ensure lock is released when we're done (success or failure) defer func() { if lockReleased { @@ -175,7 +176,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { } logger.Info("releasing unit lock", slog.String("unit_id", run.UnitID)) if unlockErr := e.unitRepo.Unlock(ctx, run.UnitID, lockInfo.ID); unlockErr != nil { - logger.Error("failed to release lock", + logger.Error("failed to release lock", slog.String("error", unlockErr.Error()), slog.String("unit_id", run.UnitID), slog.String("lock_id", lockInfo.ID)) @@ -184,6 +185,83 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { } }() + // Chunked logging to prevent memory bloat + // Upload log chunks as separate S3 objects and clear buffer after each upload + // Fixed-size 2KB chunks enable offset-based chunk selection (reduces S3 re-downloads) + const chunkSize = 2 * 1024 // 2KB fixed size + chunkIndex := 1 + var logBuffer bytes.Buffer + var logMutex sync.Mutex + lastLogFlush := time.Now() + + // Flush helper - uploads current buffer as a padded 2KB chunk and clears it + flushLogs := func() error { + logMutex.Lock() + if logBuffer.Len() == 0 { + logMutex.Unlock() + return nil + } + + // Extract at most chunkSize bytes (2KB) + dataLen := logBuffer.Len() + if dataLen > chunkSize { + dataLen = chunkSize + } + data := make([]byte, dataLen) + copy(data, logBuffer.Bytes()[:dataLen]) + currentChunk := chunkIndex + chunkIndex++ // Increment NOW before unlock to reserve this chunk number atomically + + // Copy remainder BEFORE resetting (crucial - remainder slice points to internal buffer) + var remainderCopy []byte + if logBuffer.Len() > dataLen { + remainder := logBuffer.Bytes()[dataLen:] + remainderCopy = make([]byte, len(remainder)) + copy(remainderCopy, remainder) + } + + // Now safe to reset and write remainder back + logBuffer.Reset() + if len(remainderCopy) > 0 { + logBuffer.Write(remainderCopy) + } + logMutex.Unlock() + + // Pad to fixed 2KB size (rest will be null bytes) + paddedData := make([]byte, chunkSize) + copy(paddedData, data) + + // Upload this chunk (key includes zero-padded chunk index) + chunkKey := fmt.Sprintf("plans/%s/chunks/%08d.log", *run.PlanID, currentChunk) + err := e.blobStore.UploadBlob(ctx, chunkKey, paddedData) + + if err == nil { + logMutex.Lock() + lastLogFlush = time.Now() + logMutex.Unlock() + } + return err + } + + // Buffered append - only uploads when buffer is large or time has elapsed + appendLog := func(message string) { + logMutex.Lock() + logBuffer.WriteString(message) + now := time.Now() + // Flush if buffer exceeds 2KB or 1s has passed + shouldFlush := logBuffer.Len() > chunkSize || now.Sub(lastLogFlush) > 1*time.Second + logMutex.Unlock() + + if shouldFlush { + _ = flushLogs() // Ignore errors for progress updates + } + } + + // Ensure logs are flushed at the end + defer func() { + _ = flushLogs() + }() + // Update run status to "planning" if err := e.runRepo.UpdateRunStatus(ctx, runID, "planning"); err != nil { logger.Error("failed to update status to planning", slog.String("error", err.Error())) @@ -191,6 +269,11 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { } logger.Info("updated run status to planning") + // Note: We no longer set LogBlobID since we use chunked logging + // The API reads chunks directly from plans/{planID}/chunks/*.log + + appendLog("Preparing terraform run...\n") + // Get configuration version configVer, err := e.configVerRepo.GetConfigurationVersion(ctx, run.ConfigurationVersionID) if err != nil { @@ -202,6 +285,8 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { return e.handlePlanError(ctx, run.ID, run.PlanID, logger, "Configuration not uploaded") } + appendLog("Downloading configuration...\n") + // Download configuration archive from blob storage archivePath := fmt.Sprintf("config-versions/%s/archive.tar.gz", configVer.ID) archiveData, err := e.blobStore.DownloadBlob(ctx, archivePath) @@ -209,10 +294,12 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { return e.handlePlanError(ctx, run.ID, run.PlanID, logger, fmt.Sprintf("Failed to download archive: %v", err)) } - logger.Info("downloaded configuration archive", + logger.Info("downloaded configuration archive", slog.Int("bytes", len(archiveData)), slog.String("config_version_id", configVer.ID)) + appendLog("Extracting workspace...\n") + // Extract to temp directory workDir, err := extractArchive(archiveData) if err != nil { @@ -230,6 +317,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { var workspaceArchive []byte if useSandbox { + appendLog("Packaging workspace for remote execution...\n") workspaceArchive, err = createWorkspaceArchive(workDir) if err != nil { return e.handlePlanError(ctx, run.ID, run.PlanID, logger, fmt.Sprintf("Failed to package workspace for sandbox execution: %v", err)) @@ -247,18 +335,18 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { slog.String("state_id", stateID), slog.Int("bytes", len(stateData))) } else { - // Write state to terraform.tfstate in the working directory - statePath := filepath.Join(workDir, "terraform.tfstate") - if err := os.WriteFile(statePath, stateData, 0644); err != nil { - logger.Warn("failed to write state file", slog.String("error", err.Error())) - } else { - logger.Info("downloaded and wrote existing state", - slog.String("state_id", stateID), - slog.Int("bytes", len(stateData))) + // Write state to terraform.tfstate in the working directory + statePath := filepath.Join(workDir, "terraform.tfstate") + if err := os.WriteFile(statePath, stateData, 0644); err != nil { + logger.Warn("failed to write state file", slog.String("error", err.Error())) + } else { + logger.Info("downloaded and wrote existing state", + slog.String("state_id", stateID), + slog.Int("bytes", len(stateData))) } } } else { - logger.Info("no existing state found, starting fresh", + logger.Info("no existing state found, starting fresh", slog.String("state_id", stateID)) } @@ -273,13 +361,16 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { ) if useSandbox { + appendLog("Starting remote execution environment...\n") + appendLog("Initializing terraform...\n") + logger.Info("🚀 EXECUTING PLAN IN SANDBOX", slog.String("run_id", run.ID), slog.String("unit_id", run.UnitID), slog.String("sandbox_provider", e.sandbox.Name()), slog.Int("workspace_archive_bytes", len(workspaceArchive)), slog.Int("state_bytes", len(stateData))) - + if planActivityID != "" && e.activityRepo != nil { planActivityStart = time.Now() if err := e.activityRepo.MarkRunning(ctx, planActivityID, planActivityStart, e.sandbox.Name()); err != nil { @@ -289,10 +380,29 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { } } - result, execErr := e.executePlanInSandbox(ctx, run, unitMeta, workspaceArchive, stateData) + // Start heartbeat goroutine for long-running remote executions + // This provides user feedback and prevents the UI from appearing frozen + heartbeatDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + appendLog(fmt.Sprintf("Remote plan in progress... (%s)\n", time.Now().Format("15:04:05"))) + case <-heartbeatDone: + return + case <-ctx.Done(): + return + } + } + }() + defer close(heartbeatDone) + + result, execErr := e.executePlanInSandbox(ctx, run, unitMeta, workspaceArchive, stateData, appendLog) planSandboxResult = result planErr = execErr - + if execErr != nil { logger.Error("❌ SANDBOX PLAN FAILED", slog.String("run_id", run.ID), @@ -302,7 +412,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { slog.String("run_id", run.ID), slog.Bool("has_changes", result != nil && result.HasChanges)) } - + if result != nil { logs = result.Logs hasChanges = result.HasChanges @@ -319,7 +429,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { slog.String("run_id", run.ID), slog.String("unit_id", run.UnitID), slog.String("work_dir", workDir)) - + _, planLogs, planHasChanges, planAdds, planChanges, planDestroys, execErr := e.runTerraformPlan(ctx, workDir, run.IsDestroy) logs = planLogs hasChanges = planHasChanges @@ -327,7 +437,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { changes = planChanges destroys = planDestroys planErr = execErr - + if execErr != nil { logger.Error("❌ LOCAL PLAN FAILED", slog.String("run_id", run.ID), @@ -358,10 +468,16 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { } } - // Store logs in blob storage (use UploadBlob - no lock checks needed for logs) - logBlobID := fmt.Sprintf("plans/%s/logs.txt", *run.PlanID) - if err := e.blobStore.UploadBlob(ctx, logBlobID, []byte(logs)); err != nil { - logger.Error("failed to store plan logs", slog.String("error", err.Error())) + // Append the actual terraform output to the progress logs + if !useSandbox { + appendLog("\n" + logs) + } + + // Store final status + if planErr != nil { + appendLog("\n\nPlan failed\n") + } else { + appendLog("\n\nPlan complete\n") } // Generate signed log URL @@ -371,7 +487,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { planStatus := "finished" if planErr != nil { planStatus = "errored" - logs = logs + "\n\nError: " + planErr.Error() + appendLog("\nError: " + planErr.Error() + "\n") // Store error in run for user visibility if updateErr := e.runRepo.UpdateRunError(ctx, run.ID, planErr.Error()); updateErr != nil { logger.Error("failed to update run error", slog.String("error", updateErr.Error())) @@ -384,8 +500,8 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { ResourceChanges: &changes, ResourceDestructions: &destroys, HasChanges: &hasChanges, - LogBlobID: &logBlobID, - LogReadURL: &logReadURL, + // LogBlobID removed - we use chunked logging now + LogReadURL: &logReadURL, } if len(planJSON) > 0 { jsonStr := string(planJSON) @@ -400,21 +516,21 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { // Use "planned" status (not "planned_and_finished") - this is what Terraform CLI expects runStatus := "planned" canApply := (planErr == nil) // Can apply if plan succeeded (regardless of whether there are changes) - + if planErr != nil { runStatus = "errored" } - logger.Info("updating run status", + logger.Info("updating run status", slog.String("status", runStatus), slog.Bool("can_apply", canApply)) - + if err := e.runRepo.UpdateRunStatusAndCanApply(ctx, run.ID, runStatus, canApply); err != nil { logger.Error("failed to update run", slog.String("error", err.Error())) return fmt.Errorf("failed to update run: %w", err) } - logger.Info("plan execution completed", + logger.Info("plan execution completed", slog.String("status", runStatus), slog.Bool("can_apply", canApply), slog.Bool("has_changes", hasChanges), @@ -423,31 +539,31 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { slog.Int("destroys", destroys)) // Only auto-trigger apply if AutoApply flag is true (i.e., terraform apply -auto-approve) - logger.Debug("auto-apply check", + logger.Debug("auto-apply check", slog.Bool("auto_apply", run.AutoApply), slog.Bool("plan_succeeded", planErr == nil)) - + if run.AutoApply && planErr == nil { logger.Info("triggering auto-apply") - + // Queue the apply by updating the run status if err := e.runRepo.UpdateRunStatus(ctx, run.ID, "apply_queued"); err != nil { logger.Error("failed to queue apply", slog.String("error", err.Error())) return nil // Don't fail the plan if we can't queue the apply } - + // CRITICAL: Release the plan lock BEFORE spawning apply goroutine // Otherwise we get a race condition where apply tries to acquire while plan still holds it logger.Info("releasing plan lock before triggering apply", slog.String("unit_id", run.UnitID)) if unlockErr := e.unitRepo.Unlock(ctx, run.UnitID, lockInfo.ID); unlockErr != nil { - logger.Error("failed to release plan lock before apply", + logger.Error("failed to release plan lock before apply", slog.String("error", unlockErr.Error()), slog.String("unit_id", run.UnitID)) return fmt.Errorf("failed to release lock before apply: %w", unlockErr) } lockReleased = true // Mark as released to prevent defer from trying again logger.Info("plan lock released, apply can now acquire it") - + // Trigger apply execution in background // Use a new context to avoid cancellation propagation issues applyCtx, cancel := context.WithCancel(context.Background()) @@ -465,7 +581,7 @@ func (e *PlanExecutor) ExecutePlan(ctx context.Context, runID string) error { applyLogger.Info("apply execution completed successfully") } }() - + // Return without triggering defer (lock already released) return nil } @@ -510,7 +626,7 @@ func (e *PlanExecutor) runTerraformPlan(ctx context.Context, workDir string, isD // Run terraform plan WITHOUT -out to avoid "Saved the plan to..." message // We'll run it again with -json to get structured data logger.Info("running terraform plan for human-readable output") - + if isDestroy { hasChanges, err = tf.Plan(ctx, tfexec.Destroy(true)) } else { @@ -530,20 +646,20 @@ func (e *PlanExecutor) runTerraformPlan(ctx context.Context, workDir string, isD // Reset buffer and redirect to discard human output from this run logBuffer.Reset() var jsonBuffer bytes.Buffer - + logger.Info("running terraform plan for structured JSON output") planFile := filepath.Join(workDir, "tfplan") // Temporarily redirect output so we don't pollute logs with duplicate plan tf.SetStdout(&jsonBuffer) tf.SetStderr(&jsonBuffer) - + if isDestroy { _, err = tf.Plan(ctx, tfexec.Destroy(true), tfexec.Out(planFile)) } else { _, err = tf.Plan(ctx, tfexec.Out(planFile)) } - + if err != nil { logger.Warn("failed to generate structured plan", slog.String("error", err.Error())) // Not fatal - we have the human-readable logs already @@ -570,12 +686,12 @@ func (e *PlanExecutor) runTerraformPlan(ctx context.Context, workDir string, isD } else if actions.Replace() { adds++ destroys++ - } - } + } + } } } } - + hasChanges = adds > 0 || changes > 0 || destroys > 0 logger.Info("plan completed", @@ -685,7 +801,7 @@ func extractArchive(data []byte) (string, error) { func cleanupWorkDir(dir string) { if dir != "" { if err := os.RemoveAll(dir); err != nil { - slog.Warn("failed to cleanup work directory", + slog.Warn("failed to cleanup work directory", slog.String("dir", dir), slog.String("error", err.Error())) } @@ -700,29 +816,29 @@ func createBackendOverride(workDir string) error { if err != nil { return err } - + // Only process .tf files if !info.IsDir() && strings.HasSuffix(path, ".tf") { content, err := os.ReadFile(path) if err != nil { return fmt.Errorf("failed to read %s: %w", path, err) } - + contentStr := string(content) - + // Check if file contains terraform block with cloud or backend if strings.Contains(contentStr, "cloud") || strings.Contains(contentStr, "backend") { slog.Info("removing cloud/backend configuration", slog.String("file", path)) - + // Comment out cloud and backend blocks // This is a simple approach - we comment out lines containing "cloud {" and "backend " lines := strings.Split(contentStr, "\n") var inBlock bool var blockDepth int - + for i, line := range lines { trimmed := strings.TrimSpace(line) - + // Start of cloud or backend block if (strings.Contains(trimmed, "cloud {") || strings.Contains(trimmed, "backend ")) && !strings.HasPrefix(trimmed, "#") { lines[i] = "# " + line + " # Disabled by TFE executor" @@ -730,37 +846,37 @@ func createBackendOverride(workDir string) error { blockDepth = strings.Count(line, "{") - strings.Count(line, "}") continue } - + // Inside block - comment out if inBlock { blockDepth += strings.Count(line, "{") - strings.Count(line, "}") lines[i] = "# " + line - + if blockDepth <= 0 { inBlock = false } } } - + modifiedContent := strings.Join(lines, "\n") if err := os.WriteFile(path, []byte(modifiedContent), info.Mode()); err != nil { return fmt.Errorf("failed to write %s: %w", path, err) } } } - + return nil }) - + if err != nil { return fmt.Errorf("failed to process terraform files: %w", err) } - + slog.Info("successfully removed cloud/backend configuration from terraform files") return nil } -func (e *PlanExecutor) executePlanInSandbox(ctx context.Context, run *domain.TFERun, unit *storage.UnitMetadata, archive []byte, stateData []byte) (*sandbox.PlanResult, error) { +func (e *PlanExecutor) executePlanInSandbox(ctx context.Context, run *domain.TFERun, unit *storage.UnitMetadata, archive []byte, stateData []byte, logSink func(string)) (*sandbox.PlanResult, error) { if e.sandbox == nil { return nil, fmt.Errorf("sandbox provider not configured") } @@ -793,6 +909,7 @@ func (e *PlanExecutor) executePlanInSandbox(ctx context.Context, run *domain.TFE ConfigArchive: archive, State: stateData, Metadata: metadata, + LogSink: logSink, } return e.sandbox.ExecutePlan(ctx, req) } diff --git a/taco/internal/tfe/runs.go b/taco/internal/tfe/runs.go index 89e4a10d1..83ce57b24 100644 --- a/taco/internal/tfe/runs.go +++ b/taco/internal/tfe/runs.go @@ -8,7 +8,9 @@ import ( "log/slog" "net/http" "os" + "time" + "github.com/diggerhq/digger/opentaco/internal/auth" "github.com/diggerhq/digger/opentaco/internal/domain" "github.com/diggerhq/digger/opentaco/internal/domain/tfe" "github.com/google/jsonapi" @@ -42,7 +44,7 @@ func (h *TfeHandler) GetRun(c echo.Context) error { // Determine if run is confirmable (waiting for user approval) // Status is "planned" when waiting for confirmation isConfirmable := run.Status == "planned" && run.CanApply && !run.AutoApply - + // Determine if run has changes (Terraform CLI uses this!) var hasChanges bool var planData *domain.TFEPlan @@ -71,7 +73,7 @@ func (h *TfeHandler) GetRun(c echo.Context) error { // Use unit ID as workspace ID (they're the same in our architecture) // Terraform CLI expects workspace ID in the format "ws-{uuid}" workspaceID := "ws-" + run.UnitID - + // Build response response := tfe.TFERun{ ID: run.ID, @@ -146,6 +148,9 @@ func (h *TfeHandler) GetRun(c echo.Context) error { // Build plan data for included section publicBase := os.Getenv("OPENTACO_PUBLIC_BASE_URL") + // Generate a fresh signed token for log streaming (path-based to survive CLI stripping queries) + logToken, _ := auth.GenerateLogStreamToken(planData.ID, 24*time.Hour) + logReadURL := fmt.Sprintf("%s/tfe/api/v2/plans/%s/logs/%s", publicBase, planData.ID, logToken) planData := map[string]interface{}{ "id": planData.ID, "type": "plans", @@ -155,7 +160,7 @@ func (h *TfeHandler) GetRun(c echo.Context) error { "resource-additions": planData.ResourceAdditions, "resource-changes": planData.ResourceChanges, "resource-destructions": planData.ResourceDestructions, - "log-read-url": fmt.Sprintf("%s/tfe/api/v2/plans/%s/logs", publicBase, planData.ID), + "log-read-url": logReadURL, }, "relationships": map[string]interface{}{ "run": map[string]interface{}{ @@ -170,10 +175,6 @@ func (h *TfeHandler) GetRun(c echo.Context) error { // Add included section to the document runDoc["included"] = []interface{}{planData} - // DEBUG: Log the exact JSON response - debugJSON, _ := json.MarshalIndent(runDoc, "", " ") - logger.Info("📤 FULL JSON RESPONSE WITH PLAN", slog.String("json", string(debugJSON))) - c.Response().Header().Set(echo.HeaderContentType, "application/vnd.api+json") c.Response().WriteHeader(http.StatusOK) if err := json.NewEncoder(c.Response().Writer).Encode(runDoc); err != nil { @@ -273,7 +274,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { isDestroy := requestData.Data.Attributes.IsDestroy autoApply := requestData.Data.Attributes.AutoApply planOnlyFromCLI := requestData.Data.Attributes.PlanOnly // Pointer - can be nil - + // Log the full request for debugging planOnlyValue := "not-set" if planOnlyFromCLI != nil { @@ -283,7 +284,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { planOnlyValue = "false" } } - logger.Info("create run request", + logger.Info("create run request", slog.String("message", message), slog.Bool("is_destroy", isDestroy), slog.Bool("auto_apply_from_cli", autoApply), @@ -293,7 +294,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { // Get org and user context from middleware orgIdentifier, _ := c.Get("organization_id").(string) userID, _ := c.Get("user_id").(string) - + if orgIdentifier == "" { orgIdentifier = "default-org" // Fallback for testing } @@ -305,7 +306,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { // This is critical for S3 path construction: //terraform.tfstate orgUUID, err := h.identifierResolver.ResolveOrganization(ctx, orgIdentifier) if err != nil { - logger.Error("failed to resolve organization", + logger.Error("failed to resolve organization", slog.String("org_identifier", orgIdentifier), slog.String("error", err.Error())) return c.JSON(http.StatusInternalServerError, map[string]interface{}{ @@ -316,7 +317,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { }}, }) } - logger.Info("resolved organization", + logger.Info("resolved organization", slog.String("org_identifier", orgIdentifier), slog.String("org_uuid", orgUUID)) @@ -343,7 +344,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { if unit.TFEExecutionMode != nil && *unit.TFEExecutionMode != "" { executionMode = *unit.TFEExecutionMode } - + logger.Info("🔍 DECISION POINT: Checking execution mode", slog.String("unit_id", unitID), slog.String("unit_name", unit.Name), @@ -355,7 +356,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { } return "none" }())) - + if executionMode == "remote" && h.sandbox == nil { logger.Warn("❌ BLOCKED: remote run creation - sandbox provider not configured", slog.String("unit_id", unitID), @@ -368,7 +369,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { }}, }) } - + if executionMode == "remote" { logger.Info("✅ APPROVED: Remote execution will be used", slog.String("unit_id", unitID), @@ -399,7 +400,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { // 3. Either one being true results in auto-apply workspaceAutoApply := unit.TFEAutoApply != nil && *unit.TFEAutoApply finalAutoApply := workspaceAutoApply || autoApply - + logger.Info("determining auto-apply setting", slog.Bool("workspace_auto_apply", workspaceAutoApply), slog.Bool("cli_auto_approve", autoApply), @@ -429,8 +430,8 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { ConfigurationVersionID: cvID, CreatedBy: userID, } - - logger.Info("creating run", + + logger.Info("creating run", slog.Bool("auto_apply", finalAutoApply), slog.Bool("plan_only", run.PlanOnly)) @@ -445,7 +446,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { }) } - logger.Info("created run", + logger.Info("created run", slog.String("run_id", run.ID), slog.String("unit_id", unitID)) @@ -468,7 +469,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { }) } - logger.Info("created plan", + logger.Info("created plan", slog.String("plan_id", plan.ID), slog.String("run_id", run.ID)) @@ -490,7 +491,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { planLogger.Info("starting async plan execution") // Create plan executor executor := NewPlanExecutor(h.runRepo, h.planRepo, h.configVerRepo, h.blobStore, h.unitRepo, h.sandbox, h.runActivityRepo) - + // Execute the plan (this will run terraform plan) if err := executor.ExecutePlan(planCtx, run.ID); err != nil { planLogger.Error("plan execution failed", slog.String("error", err.Error())) @@ -527,7 +528,7 @@ func (h *TfeHandler) CreateRun(c echo.Context) error { ID: cvID, }, } - + // For auto-apply runs, include Apply reference immediately so Terraform CLI knows to expect it if run.AutoApply { response.Apply = &tfe.ApplyRef{ID: run.ID} @@ -627,7 +628,7 @@ func (h *TfeHandler) ApplyRun(c echo.Context) error { applyLogger.Info("starting async apply execution") // Create apply executor executor := NewApplyExecutor(h.runRepo, h.planRepo, h.configVerRepo, h.blobStore, h.unitRepo, h.sandbox, h.runActivityRepo) - + // Execute the apply (this will run terraform apply) if err := executor.ExecuteApply(applyCtx, runID); err != nil { applyLogger.Error("apply execution failed", slog.String("error", err.Error())) @@ -665,7 +666,7 @@ func (h *TfeHandler) ApplyRun(c echo.Context) error { if run.PlanID != nil { response.Plan = &tfe.PlanRef{ID: *run.PlanID} } - + // Include Apply reference so Terraform CLI knows to fetch apply logs response.Apply = &tfe.ApplyRef{ID: run.ID} logger.Debug("added apply reference", slog.String("apply_id", run.ID)) @@ -700,7 +701,7 @@ func (h *TfeHandler) GetRunEvents(c echo.Context) error { // Generate events based on run status (JSON:API format with type field) events := []map[string]interface{}{} eventCounter := 0 - + // Helper to create a properly formatted event addEvent := func(action, description string) { eventCounter++ @@ -714,7 +715,7 @@ func (h *TfeHandler) GetRunEvents(c echo.Context) error { }, }) } - + // Always include "run created" event addEvent("created", "Run was created") diff --git a/taco/internal/tfe/tfe.go b/taco/internal/tfe/tfe.go index 4bf612ea5..8fca20e71 100644 --- a/taco/internal/tfe/tfe.go +++ b/taco/internal/tfe/tfe.go @@ -17,7 +17,7 @@ type TfeHandler struct { rbacManager *rbac.RBACManager apiTokens *auth.APITokenManager identifierResolver domain.IdentifierResolver // For resolving org external IDs - + // TFE repositories for runs, plans, and configuration versions runRepo domain.TFERunRepository planRepo domain.TFEPlanRepository diff --git a/taco/internal/tfe/well_known.go b/taco/internal/tfe/well_known.go index 5bde6a098..440f65e21 100644 --- a/taco/internal/tfe/well_known.go +++ b/taco/internal/tfe/well_known.go @@ -20,7 +20,7 @@ func (h *TfeHandler) MessageOfTheDay(c echo.Context) error { logger.Debug("TFE message of the day", "operation", "tfe_motd", ) - + c.Response().Header().Set(echo.HeaderContentType, "application/json") c.Response().Header().Set("Tfp-Api-Version", "2.5") c.Response().Header().Set("X-Terraform-Enterprise-App", "Terraform Enterprise") @@ -35,7 +35,7 @@ func (h *TfeHandler) GetWellKnownJson(c echo.Context) error { logger.Info("TFE well-known discovery", "operation", "tfe_well_known", ) - + c.Response().Header().Set(echo.HeaderContentType, "application/json") c.Response().Header().Set("Tfp-Api-Version", "2.5") c.Response().Header().Set("X-Terraform-Enterprise-App", "Terraform Enterprise") diff --git a/taco/internal/tfe/workspaces.go b/taco/internal/tfe/workspaces.go index 2279b3158..fc32b715f 100644 --- a/taco/internal/tfe/workspaces.go +++ b/taco/internal/tfe/workspaces.go @@ -103,24 +103,24 @@ func (h *TfeHandler) convertWorkspaceToStateIDWithOrg(ctx context.Context, orgId return "", fmt.Errorf("invalid workspace name: empty after stripping ws- prefix") } } - + // If no org identifier provided or no resolver, return workspace name as-is (backwards compat) if orgIdentifier == "" || h.identifierResolver == nil { return workspaceName, nil } - + // Step 1: Resolve organization identifier (external_org_id or UUID) to UUID orgUUID, err := h.identifierResolver.ResolveOrganization(ctx, orgIdentifier) if err != nil { return "", fmt.Errorf("failed to resolve organization '%s': %w", orgIdentifier, err) } - + // Step 2: Resolve unit name to UUID within the organization unitUUID, err := h.identifierResolver.ResolveUnit(ctx, workspaceName, orgUUID) if err != nil { return "", fmt.Errorf("failed to resolve unit '%s' in org '%s': %w", workspaceName, orgIdentifier, err) } - + // Return org-scoped unit path for S3 storage // Format: / (both UUIDs for immutable, rename-safe paths) // Example: "123e4567-e89b-12d3-a456-426614174000/987f6543-e21a-43d2-b789-123456789abc" @@ -168,14 +168,14 @@ func getOrgFromContext(c echo.Context) (string, error) { return orgStr, nil } } - + // Try organization_id (set by WebhookAuth middleware) if orgID := c.Get("organization_id"); orgID != nil { if orgStr, ok := orgID.(string); ok && orgStr != "" { return orgStr, nil } } - + // No organization context found - this is an error condition return "", fmt.Errorf("no organization context found in request") } @@ -219,7 +219,7 @@ func extractWorkspaceIDFromParam(c echo.Context) string { // checkWorkspacePermission handles the three RBAC scenarios correctly func (h *TfeHandler) checkWorkspacePermission(c echo.Context, action string, workspaceID string) error { - + // Scenario 1: No RBAC manager (memory storage) → permissive mode if h.rbacManager == nil { return nil @@ -288,8 +288,7 @@ func (h *TfeHandler) checkWorkspacePermission(c echo.Context, action string, wor // Webhook auth uses internal token + X-User-ID/X-Email headers userIDHeader := c.Request().Header.Get("X-User-ID") userEmailHeader := c.Request().Header.Get("X-Email") - - + var principal rbac.Principal if userIDHeader != "" && userEmailHeader != "" { // This is webhook auth from internal proxy (UI) - user already verified @@ -307,7 +306,7 @@ func (h *TfeHandler) checkWorkspacePermission(c echo.Context, action string, wor if err != nil { return fmt.Errorf("failed to get organization context: %v", err) } - + if tokenRecord, err := h.apiTokens.Verify(c.Request().Context(), orgID, token); err == nil { principal = rbac.Principal{ Subject: tokenRecord.Subject, @@ -336,7 +335,7 @@ func (h *TfeHandler) checkWorkspacePermission(c echo.Context, action string, wor } // Check permission using RBAC manager - + allowed, err := h.rbacManager.Can(c.Request().Context(), principal, rbacAction, stateID) if err != nil { return fmt.Errorf("failed to check permissions: %v", err) @@ -357,7 +356,7 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { orgParam := c.Param("org_name") workspaceName := c.Param("workspace_name") - + if workspaceName == "" { logger.Warn("Invalid workspace name", "operation", "tfe_get_workspace", @@ -365,10 +364,10 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { ) return c.JSON(400, map[string]string{"error": "workspace_name invalid"}) } - + // Parse org param - supports both "Display:identifier" and just "identifier" displayName, orgIdentifier := parseOrgParam(orgParam) - + logger.Info("Getting TFE workspace", "operation", "tfe_get_workspace", "org_param", orgParam, @@ -376,7 +375,7 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { "org_identifier", orgIdentifier, "workspace_name", workspaceName, ) - + // Convert workspace name to unit ID (org-scoped if org provided) // workspaceName is now the human-readable unit name, not a UUID stateID, err := h.convertWorkspaceToStateIDWithOrg(c.Request().Context(), orgIdentifier, workspaceName) @@ -388,20 +387,20 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { "error", err, ) return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } - + logger.Debug("Resolved workspace state ID", "operation", "tfe_get_workspace", "state_id", stateID, ) - + // Extract unit UUID from state ID - repository expects just the UUID unitUUID := extractUnitUUID(stateID) fmt.Printf("GetWorkspace: Extracted unitUUID=%s from stateID=%s\n", unitUUID, stateID) - + // Fetch unit to get execution mode and lock status unit, err := h.unitRepo.Get(c.Request().Context(), unitUUID) if err != nil { @@ -418,64 +417,64 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { }}, }) } - + // Determine execution mode (default to 'local' if not specified) executionMode := "local" if unit.TFEExecutionMode != nil && *unit.TFEExecutionMode != "" { executionMode = *unit.TFEExecutionMode } - + // Determine auto-apply setting (default to false if not specified) autoApply := false if unit.TFEAutoApply != nil { autoApply = *unit.TFEAutoApply } - + // Check if unit is locked locked := unit.Locked var currentRun *tfe.TFERun if locked && unit.LockID != "" { - currentRun = &tfe.TFERun{ + currentRun = &tfe.TFERun{ ID: unit.LockID, } } workspace := &tfe.TFEWorkspace{ - ID: tfe.NewTfeResourceIdentifier(tfe.WorkspaceType, workspaceName).String(), - Actions: &tfe.TFEWorkspaceActions{IsDestroyable: true}, - AgentPoolID: tfe.NewTfeResourceIdentifier(tfe.AgentPoolType, "HzEaJWMP5YTatZaS").String(), - AllowDestroyPlan: false, - AutoApply: autoApply, - CanQueueDestroyPlan: false, - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - Description: workspaceName, - Environment: workspaceName, - ExecutionMode: executionMode, - FileTriggersEnabled: false, - GlobalRemoteState: false, - Locked: locked, - MigrationEnvironment: "", - Name: workspaceName, - Operations: true, - Permissions: &tfe.TFEWorkspacePermissions{ - CanDestroy: true, - CanForceUnlock: true, - CanLock: true, - CanQueueApply: true, - CanQueueDestroy: true, - CanQueueRun: true, - CanUnlock: true, - CanUpdate: true, - CanReadSettings: true, + ID: tfe.NewTfeResourceIdentifier(tfe.WorkspaceType, workspaceName).String(), + Actions: &tfe.TFEWorkspaceActions{IsDestroyable: true}, + AgentPoolID: tfe.NewTfeResourceIdentifier(tfe.AgentPoolType, "HzEaJWMP5YTatZaS").String(), + AllowDestroyPlan: false, + AutoApply: autoApply, + CanQueueDestroyPlan: false, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + Description: workspaceName, + Environment: workspaceName, + ExecutionMode: executionMode, + FileTriggersEnabled: false, + GlobalRemoteState: false, + Locked: locked, + MigrationEnvironment: "", + Name: workspaceName, + Operations: true, + Permissions: &tfe.TFEWorkspacePermissions{ + CanDestroy: true, + CanForceUnlock: true, + CanLock: true, + CanQueueApply: true, + CanQueueDestroy: true, + CanQueueRun: true, + CanUnlock: true, + CanUpdate: true, + CanReadSettings: true, CanUpdateVariable: false, }, QueueAllRuns: false, - SpeculativeEnabled: false, // False = allow confirmable applies, True = all runs are plan-only + SpeculativeEnabled: false, // False = allow confirmable applies, True = all runs are plan-only SourceName: "", SourceURL: "", StructuredRunOutputEnabled: false, - TerraformVersion: "1.5.6", + TerraformVersion: "1.5.6", TriggerPrefixes: []string{}, TriggerPatterns: []string{}, VCSRepo: nil, @@ -487,9 +486,9 @@ func (h *TfeHandler) GetWorkspace(c echo.Context) error { RunFailures: 0, RunsCount: 0, TagNames: []string{}, - CurrentRun: currentRun, // Include lock details when workspace is locked + CurrentRun: currentRun, // Include lock details when workspace is locked Organization: &tfe.TFEOrganization{ - Name: orgParam, // Return the full org param (includes display name if provided) + Name: orgParam, // Return the full org param (includes display name if provided) }, Outputs: nil, } @@ -518,7 +517,7 @@ func (h *TfeHandler) LockWorkspace(c echo.Context) error { // Strip ws- prefix to get workspace name workspaceName := convertWorkspaceToStateID(workspaceID) - + // Get org from authentication context (JWT claim or webhook header) orgIdentifier, err := getOrgFromContext(c) if err != nil { @@ -528,11 +527,11 @@ func (h *TfeHandler) LockWorkspace(c echo.Context) error { "error", err, ) return c.JSON(http.StatusUnauthorized, map[string]string{ - "error": "Organization context required", + "error": "Organization context required", "detail": err.Error(), }) } - + logger.Info("Locking TFE workspace", "operation", "tfe_lock_workspace", "workspace_id", workspaceID, @@ -550,7 +549,7 @@ func (h *TfeHandler) LockWorkspace(c echo.Context) error { "error", err, ) return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } @@ -646,14 +645,14 @@ func (h *TfeHandler) LockWorkspace(c echo.Context) error { // Return success with full workspace object (properly formatted JSON:API) fmt.Printf("LockWorkspace: Returning success\n") - + // Build a workspace response with lock info logger.Info("Workspace locked successfully", "operation", "tfe_lock_workspace", "unit_uuid", unitUUID, "lock_id", lockInfo.ID, ) - + workspace := &tfe.TFEWorkspace{ ID: tfe.NewTfeResourceIdentifier(tfe.WorkspaceType, workspaceName).String(), Name: workspaceName, @@ -662,7 +661,7 @@ func (h *TfeHandler) LockWorkspace(c echo.Context) error { ID: lockInfo.ID, }, } - + if err := jsonapi.MarshalPayload(c.Response().Writer, workspace); err != nil { logger.Error("Failed to marshal workspace payload", "operation", "tfe_lock_workspace", @@ -690,7 +689,7 @@ func (h *TfeHandler) UnlockWorkspace(c echo.Context) error { // Strip ws- prefix to get workspace name workspaceName := convertWorkspaceToStateID(workspaceID) - + // Get org from authentication context (JWT claim or webhook header) orgIdentifier, err := getOrgFromContext(c) if err != nil { @@ -700,18 +699,18 @@ func (h *TfeHandler) UnlockWorkspace(c echo.Context) error { "error", err, ) return c.JSON(http.StatusUnauthorized, map[string]string{ - "error": "Organization context required", + "error": "Organization context required", "detail": err.Error(), }) } - + logger.Info("Unlocking TFE workspace", "operation", "tfe_unlock_workspace", "workspace_id", workspaceID, "workspace_name", workspaceName, "org_identifier", orgIdentifier, ) - + // Resolve to UUID/UUID path stateID, err := h.convertWorkspaceToStateIDWithOrg(c.Request().Context(), orgIdentifier, workspaceName) if err != nil { @@ -722,7 +721,7 @@ func (h *TfeHandler) UnlockWorkspace(c echo.Context) error { "error", err, ) return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } @@ -799,15 +798,15 @@ func (h *TfeHandler) UnlockWorkspace(c echo.Context) error { "unit_uuid", unitUUID, "lock_id", currentLock.ID, ) - + // Return success with full workspace object (properly formatted JSON:API) workspace := &tfe.TFEWorkspace{ ID: tfe.NewTfeResourceIdentifier(tfe.WorkspaceType, workspaceName).String(), Name: workspaceName, Locked: false, - CurrentRun: nil, // No lock, so no current run + CurrentRun: nil, // No lock, so no current run } - + if err := jsonapi.MarshalPayload(c.Response().Writer, workspace); err != nil { logger.Error("Failed to marshal workspace payload", "operation", "tfe_unlock_workspace", @@ -832,23 +831,23 @@ func (h *TfeHandler) ForceUnlockWorkspace(c echo.Context) error { // Strip ws- prefix to get workspace name workspaceName := convertWorkspaceToStateID(workspaceID) - + // Get org from authentication context (JWT claim or webhook header) orgIdentifier, err := getOrgFromContext(c) if err != nil { fmt.Printf("ForceUnlockWorkspace: %v\n", err) return c.JSON(http.StatusUnauthorized, map[string]string{ - "error": "Organization context required", + "error": "Organization context required", "detail": err.Error(), }) } - + // Resolve to UUID/UUID path stateID, err := h.convertWorkspaceToStateIDWithOrg(c.Request().Context(), orgIdentifier, workspaceName) if err != nil { fmt.Printf("ForceUnlockWorkspace: failed to resolve workspace: %v\n", err) return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } @@ -898,7 +897,7 @@ func (h *TfeHandler) ForceUnlockWorkspace(c echo.Context) error { if requestedLockID != "" && requestedLockID != currentLock.ID { fmt.Printf("ForceUnlockWorkspace: Lock ID mismatch - requested=%s, current=%s\n", requestedLockID, currentLock.ID) return c.JSON(409, map[string]interface{}{ - "error": "lock_id_mismatch", + "error": "lock_id_mismatch", "message": fmt.Sprintf("Lock ID %q does not match existing lock ID %q", requestedLockID, currentLock.ID), "current_lock": map[string]interface{}{ "id": currentLock.ID, @@ -925,9 +924,9 @@ func (h *TfeHandler) ForceUnlockWorkspace(c echo.Context) error { ID: tfe.NewTfeResourceIdentifier(tfe.WorkspaceType, workspaceName).String(), Name: workspaceName, Locked: false, - CurrentRun: nil, // No lock, so no current run + CurrentRun: nil, // No lock, so no current run } - + if err := jsonapi.MarshalPayload(c.Response().Writer, workspace); err != nil { fmt.Printf("ForceUnlockWorkspace: error marshaling workspace payload: %v\n", err) return err @@ -953,7 +952,7 @@ func (h *TfeHandler) GetCurrentStateVersion(c echo.Context) error { // Strip ws- prefix to get workspace name workspaceName := convertWorkspaceToStateID(workspaceID) - + // Get org from authentication context (JWT claim or webhook header) orgIdentifier, err := getOrgFromContext(c) if err != nil { @@ -963,23 +962,23 @@ func (h *TfeHandler) GetCurrentStateVersion(c echo.Context) error { "error", err, ) return c.JSON(http.StatusUnauthorized, map[string]string{ - "error": "Organization context required", + "error": "Organization context required", "detail": err.Error(), }) } - + logger.Info("Getting current state version", "operation", "tfe_get_current_state", "workspace_id", workspaceID, "workspace_name", workspaceName, "org_identifier", orgIdentifier, ) - + // Resolve to UUID/UUID path stateID, err := h.convertWorkspaceToStateIDWithOrg(c.Request().Context(), orgIdentifier, workspaceName) if err != nil { return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } @@ -993,18 +992,18 @@ func (h *TfeHandler) GetCurrentStateVersion(c echo.Context) error { } // Check if state exists - + // Extract unit UUID from state ID - repository expects just the UUID unitUUID := extractUnitUUID(stateID) - + stateMeta, err := h.stateStore.Get(c.Request().Context(), unitUUID) - + if err != nil { } if stateMeta != nil { } else { } - + if err != nil { if err == storage.ErrNotFound { return c.JSON(404, map[string]string{ @@ -1064,23 +1063,23 @@ func (h *TfeHandler) CreateStateVersion(c echo.Context) error { // Strip ws- prefix to get workspace name workspaceName := convertWorkspaceToStateID(workspaceID) - + // Get org from authentication context (JWT claim or webhook header) orgIdentifier, err := getOrgFromContext(c) if err != nil { fmt.Printf("CreateStateVersion: %v\n", err) return c.JSON(http.StatusUnauthorized, map[string]string{ - "error": "Organization context required", + "error": "Organization context required", "detail": err.Error(), }) } - + // Resolve to UUID/UUID path stateID, err := h.convertWorkspaceToStateIDWithOrg(c.Request().Context(), orgIdentifier, workspaceName) if err != nil { fmt.Printf("CreateStateVersion: failed to resolve workspace: %v\n", err) return c.JSON(500, map[string]string{ - "error": "failed to resolve workspace", + "error": "failed to resolve workspace", "detail": err.Error(), }) } @@ -1135,7 +1134,6 @@ func (h *TfeHandler) CreateStateVersion(c echo.Context) error { return h.CreateStateVersionDirect(c, workspaceID, stateID, stateBytes) } - // Look for the actual state content - it might be base64 encoded or in a specific field if jsonStateOutputs, exists := attributes["json-state-outputs"]; exists { fmt.Printf("CreateStateVersion: Found json-state-outputs field\n") @@ -1330,7 +1328,7 @@ func (h *TfeHandler) DownloadStateVersion(c echo.Context) error { // Extract unit UUID from state ID - repository expects just the UUID unitUUID := extractUnitUUID(stateID) - + // Download the state data stateData, err := h.directStateStore.Download(c.Request().Context(), unitUUID) if err != nil { @@ -1487,7 +1485,7 @@ func (h *TfeHandler) ShowStateVersion(c echo.Context) error { // Extract unit UUID from state ID - repository expects just the UUID unitUUID := extractUnitUUID(stateID) - + // Load metadata (and optionally content) meta, err := h.stateStore.Get(c.Request().Context(), unitUUID) if err != nil { diff --git a/ui/src/components/UnitCreateForm.tsx b/ui/src/components/UnitCreateForm.tsx index d49369b7f..14e0d4ac2 100644 --- a/ui/src/components/UnitCreateForm.tsx +++ b/ui/src/components/UnitCreateForm.tsx @@ -35,9 +35,9 @@ export default function UnitCreateForm({ const [isCreating, setIsCreating] = React.useState(false) const [error, setError] = React.useState(null) - // Remote runs are gated by localStorage flag for beta testing - // Set localStorage.setItem('REMOTE_RUNS', 'true') to enable - const remoteRunsEnabled = typeof window !== 'undefined' && localStorage.getItem('REMOTE_RUNS') === 'true' +const remoteRunsEnabled = true + + const handleCreate = async () => { if (!unitName.trim()) return