Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sandbox-sidecar/src/jobs/jobRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class JobRunner {
logger.info({ job: job.id, runner: this.runner.name }, "sandbox job started");

try {
const result = await this.runner.run(job);
const result = await this.runner.run(job, (chunk) => this.store.appendLogs(job.id, chunk));
this.store.updateStatus(job.id, "succeeded", result.logs);
this.store.setResult(job.id, result.result);
logger.info({ job: job.id }, "sandbox job succeeded");
Expand All @@ -36,4 +36,3 @@ export class JobRunner {
}
}
}

8 changes: 7 additions & 1 deletion sandbox-sidecar/src/jobs/jobStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ export class JobStore {
job.updatedAt = new Date();
}

appendLogs(id: string, chunk: string) {
const job = this.jobs.get(id);
if (!job || !chunk) return;
job.logs = `${job.logs}${chunk}`;
job.updatedAt = new Date();
}

setResult(id: string, result: SandboxRunResult | undefined) {
const job = this.jobs.get(id);
if (!job) return;
job.result = result;
job.updatedAt = new Date();
}
}

68 changes: 45 additions & 23 deletions sandbox-sidecar/src/runners/e2bRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ export class E2BSandboxRunner implements SandboxRunner {
}
}

async run(job: SandboxRunRecord): Promise<RunnerOutput> {
async run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
if (job.payload.operation === "plan") {
return this.runPlan(job);
return this.runPlan(job, appendLog);
}
return this.runApply(job);
return this.runApply(job, appendLog);
}

private async runPlan(job: SandboxRunRecord): Promise<RunnerOutput> {
private async runPlan(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
const requestedVersion = job.payload.terraformVersion || "1.5.7";
const requestedEngine = job.payload.engine || "terraform";
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
Expand All @@ -42,28 +42,32 @@ export class E2BSandboxRunner implements SandboxRunner {

const workDir = await this.setupWorkspace(sandbox, job);
const logs: string[] = [];
const streamLog = (chunk: string) => {
if (!chunk) return;
appendLog?.(chunk);
};

// Run terraform init
await this.runTerraformCommand(
sandbox,
workDir,
["init", "-input=false", "-no-color"],
logs,
streamLog,
);

// Run terraform plan
const planArgs = ["plan", "-input=false", "-no-color", "-out=tfplan.binary"];
if (job.payload.isDestroy) {
planArgs.splice(1, 0, "-destroy");
}
await this.runTerraformCommand(sandbox, workDir, planArgs, logs);
await this.runTerraformCommand(sandbox, workDir, planArgs, logs, streamLog);

// Get plan JSON
const showResult = await this.runTerraformCommand(
sandbox,
workDir,
["show", "-json", "tfplan.binary"],
logs,
);

const planJSON = showResult.stdout;
Expand All @@ -76,31 +80,36 @@ export class E2BSandboxRunner implements SandboxRunner {
planJSON: Buffer.from(planJSON, "utf8").toString("base64"),
};

return { logs: logs.join("\n"), result };
return { logs: logs.join(""), result };
} finally {
await sandbox.kill();
}
}

private async runApply(job: SandboxRunRecord): Promise<RunnerOutput> {
private async runApply(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
const requestedVersion = job.payload.terraformVersion || "1.5.7";
const requestedEngine = job.payload.engine || "terraform";
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
try {
// Install IaC tool if using fallback template
if (needsInstall) {
await this.installIacTool(sandbox, requestedEngine, requestedVersion);
}

const workDir = await this.setupWorkspace(sandbox, job);
const logs: string[] = [];
const requestedEngine = job.payload.engine || "terraform";
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
try {
// Install IaC tool if using fallback template
if (needsInstall) {
await this.installIacTool(sandbox, requestedEngine, requestedVersion);
}

const workDir = await this.setupWorkspace(sandbox, job);
const logs: string[] = [];
const streamLog = (chunk: string) => {
if (!chunk) return;
appendLog?.(chunk);
};

// Run terraform init
await this.runTerraformCommand(
sandbox,
workDir,
["init", "-input=false", "-no-color"],
logs,
streamLog,
);

// Run terraform apply/destroy
Expand All @@ -110,6 +119,7 @@ export class E2BSandboxRunner implements SandboxRunner {
workDir,
[applyCommand, "-auto-approve", "-input=false", "-no-color"],
logs,
streamLog,
);

// Read the state file
Expand All @@ -119,7 +129,7 @@ export class E2BSandboxRunner implements SandboxRunner {
state: Buffer.from(stateContent, "utf8").toString("base64"),
};

return { logs: logs.join("\n"), result };
return { logs: logs.join(""), result };
} finally {
await sandbox.kill();
}
Expand Down Expand Up @@ -262,28 +272,41 @@ export class E2BSandboxRunner implements SandboxRunner {
cwd: string,
args: string[],
logBuffer?: string[],
appendLog?: (chunk: string) => void,
): Promise<{ stdout: string; stderr: string }> {
const engine = (sandbox as any)._requestedEngine || "terraform";
const binaryName = engine === "tofu" ? "tofu" : "terraform";
const cmdStr = `${binaryName} ${args.join(" ")}`;
logger.info({ cmd: cmdStr, cwd, engine }, "running IaC command in E2B sandbox");

let sawStream = false;
const pipeChunk = (chunk: string | undefined) => {
if (!chunk) return;
sawStream = true;
if (logBuffer) {
logBuffer.push(chunk);
}
appendLog?.(chunk);
};

const result = await sandbox.commands.run(cmdStr, {
cwd,
envs: {
TF_IN_AUTOMATION: "1",
},
onStdout: pipeChunk,
onStderr: pipeChunk,
});

const stdout = result.stdout;
const stderr = result.stderr;
const exitCode = result.exitCode;

// Push any remaining buffered output for completeness in final log
const mergedLogs = `${stdout}\n${stderr}`.trim();
if (logBuffer && mergedLogs.length > 0) {
logBuffer.push(mergedLogs);
if (!sawStream && mergedLogs.length > 0) {
pipeChunk(mergedLogs + "\n");
}

if (exitCode !== 0) {
throw new Error(
`${binaryName} ${args[0]} exited with code ${exitCode}\n${mergedLogs}`,
Expand Down Expand Up @@ -330,4 +353,3 @@ export class E2BSandboxRunner implements SandboxRunner {
}
}
}

3 changes: 1 addition & 2 deletions sandbox-sidecar/src/runners/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ export interface RunnerOutput {

export interface SandboxRunner {
readonly name: string;
run(job: SandboxRunRecord): Promise<RunnerOutput>;
run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput>;
}

22 changes: 15 additions & 7 deletions taco/internal/sandbox/e2b.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe
if req == nil {
return nil, fmt.Errorf("plan request cannot be nil")
}

// Validate engine field is set
if req.Engine == "" {
return nil, fmt.Errorf("engine field is required but was empty")
Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe
return nil, err
}

status, err := s.waitForCompletion(ctx, jobID)
status, err := s.waitForCompletion(ctx, jobID, req.LogSink)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl
if req == nil {
return nil, fmt.Errorf("apply request cannot be nil")
}

// Validate engine field is set
if req.Engine == "" {
return nil, fmt.Errorf("engine field is required but was empty")
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl
return nil, err
}

status, err := s.waitForCompletion(ctx, jobID)
status, err := s.waitForCompletion(ctx, jobID, req.LogSink)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin
// Retry logic for transient failures (network issues, sidecar temporarily unavailable)
maxRetries := 3
var lastErr error

for attempt := 1; attempt <= maxRetries; attempt++ {
if attempt > 1 {
// Exponential backoff: 1s, 2s, 4s
Expand Down Expand Up @@ -214,22 +214,30 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin
}
return startResp.ID, nil
}

return "", fmt.Errorf("failed to start sandbox run after %d attempts: %w", maxRetries, lastErr)
}

func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string) (*e2bRunStatusResponse, error) {
func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string, onLog func(string)) (*e2bRunStatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, s.cfg.PollTimeout)
defer cancel()

ticker := time.NewTicker(s.cfg.PollInterval)
defer ticker.Stop()

var lastErr error
lastLen := 0

for {
status, err := s.fetchStatus(ctx, runID)
if err == nil {
// Stream incremental log chunks as we see new bytes
if onLog != nil && len(status.Logs) > lastLen {
chunk := status.Logs[lastLen:]
onLog(chunk)
lastLen = len(status.Logs)
}

switch strings.ToLower(status.Status) {
case "succeeded", "completed", "done":
return status, nil
Expand Down
6 changes: 6 additions & 0 deletions taco/internal/sandbox/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type PlanRequest struct {
ConfigArchive []byte
State []byte
Metadata map[string]string
// LogSink is an optional callback that receives incremental log chunks
// as they are observed while polling the sandbox run.
LogSink func(chunk string)
}

// PlanResult captures the outcome of a sandboxed plan execution.
Expand Down Expand Up @@ -43,6 +46,9 @@ type ApplyRequest struct {
ConfigArchive []byte
State []byte
Metadata map[string]string
// LogSink is an optional callback that receives incremental log chunks
// as they are observed while polling the sandbox run.
LogSink func(chunk string)
}

// ApplyResult captures the outcome of a sandboxed apply.
Expand Down
62 changes: 19 additions & 43 deletions taco/internal/tfe/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,49 +100,25 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
return c.JSON(http.StatusNotFound, map[string]string{"error": "apply not found"})
}

// Try to get apply logs from blob storage
var logText string
applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID)

logData, err := h.blobStore.DownloadBlob(ctx, applyLogBlobID)
if err == nil {
logText = string(logData)
} else {
// If logs don't exist yet, return placeholder
if run.Status == "applying" || run.Status == "apply_queued" {
logText = "Waiting for apply to start...\n"
} else {
logText = "Apply logs not available\n"
}
}

// Handle offset for streaming with proper byte accounting
// Stream format: [STX at offset 0][logText at offset 1+][ETX at offset 1+len(logText)]
var responseData []byte

if offsetInt == 0 {
// First request: send STX + current logs
responseData = append([]byte{0x02}, []byte(logText)...)
fmt.Printf("📤 APPLY LOGS at offset=0: STX + %d bytes of log text\n", len(logText))
} else {
// Client already received STX (1 byte at offset 0)
// Map stream offset to logText offset: streamOffset=1 → logText[0]
logOffset := offsetInt - 1

if logOffset < int64(len(logText)) {
// Send remaining log text
responseData = []byte(logText[logOffset:])
fmt.Printf("📤 APPLY LOGS at offset=%d: sending %d bytes (logText[%d:])\n",
offsetInt, len(responseData), logOffset)
} else if logOffset == int64(len(logText)) && run.Status == "applied" {
// All logs sent, send ETX
responseData = []byte{0x03}
fmt.Printf("📤 Sending ETX (End of Text) for apply %s - logs complete\n", applyID)
} else {
// Waiting for more logs or already sent ETX
responseData = []byte{}
fmt.Printf("📤 APPLY LOGS at offset=%d: no new data (waiting or complete)\n", offsetInt)
}
responseData, err := streamChunkedLogs(ctx, h.blobStore, logStreamOptions{
Prefix: "applies",
Label: "APPLY",
ID: run.ID,
Offset: offsetInt,
ChunkSize: 2 * 1024,
GenerateDefaultText: func() string {
if run.Status == "applying" || run.Status == "apply_queued" {
return "Waiting for apply to start...\n"
}
return "Apply logs not available\n"
},
IsComplete: func() bool {
return run.Status == "applied" || run.Status == "errored"
},
AppendETXOnFirst: true, // If already complete on first request, send ETX immediately
})
if err != nil {
return err
}

c.Response().Header().Set("Content-Type", "text/plain")
Expand Down
Loading
Loading