Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 145 additions & 44 deletions src/session-manager/adapters/opencode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const SSE_TIMEOUT_MS = 120000;
const serverPorts = new Map<string, number>();
const serverStarting = new Map<string, Promise<number>>();

let hostServerPort: number | null = null;
let hostServerStarting: Promise<number> | null = null;
let hostServerProcess: Subprocess<'ignore', 'pipe', 'pipe'> | null = null;

async function findAvailablePort(containerName: string): Promise<number> {
const script = `import socket; s=socket.socket(); s.bind(('', 0)); print(s.getsockname()[1]); s.close()`;
const result = await execInContainer(containerName, ['python3', '-c', script], {
Expand Down Expand Up @@ -119,6 +123,7 @@ export class OpenCodeAdapter implements AgentAdapter {
private model?: string;
private status: SessionStatus = 'idle';
private port?: number;
private isHost = false;
private sseProcess: Subprocess<'ignore', 'pipe', 'pipe'> | null = null;

private messageCallback?: MessageCallback;
Expand All @@ -138,25 +143,91 @@ export class OpenCodeAdapter implements AgentAdapter {
}

async start(options: AdapterStartOptions): Promise<void> {
if (options.isHost) {
throw new Error('OpenCode adapter does not support host mode');
}

this.isHost = options.isHost;
this.containerName = options.containerName;
this.agentSessionId = options.agentSessionId;
this.model = options.model;

try {
this.port = await startServer(this.containerName!);
if (this.isHost) {
this.port = await this.startServerHost();
} else {
this.port = await startServer(this.containerName!);
}
this.setStatus('idle');
} catch (err) {
this.emitError(err as Error);
throw err;
}
}

private async startServerHost(): Promise<number> {
if (hostServerPort && (await this.isServerRunningHost(hostServerPort))) {
return hostServerPort;
}

if (hostServerStarting) {
return hostServerStarting;
}

const startPromise = (async () => {
const port = await this.findAvailablePortHost();

console.log(`[opencode] Starting server on port ${port} on host`);

hostServerProcess = Bun.spawn(
['opencode', 'serve', '--port', String(port), '--hostname', '127.0.0.1'],
{
stdin: 'ignore',
stdout: 'pipe',
stderr: 'pipe',
}
);

for (let i = 0; i < 30; i++) {
await new Promise((resolve) => setTimeout(resolve, 500));
if (await this.isServerRunningHost(port)) {
console.log(`[opencode] Server ready on port ${port}`);
hostServerPort = port;
hostServerStarting = null;
return port;
}
}

hostServerStarting = null;
if (hostServerProcess) {
hostServerProcess.kill();
await hostServerProcess.exited;
hostServerProcess = null;
}
throw new Error('Failed to start OpenCode server on host');
})();

hostServerStarting = startPromise;
return startPromise;
}

private async findAvailablePortHost(): Promise<number> {
const server = Bun.serve({
port: 0,
fetch: () => new Response(''),
});
const port = server.port!;
server.stop();
return port;
Comment on lines +210 to +217
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The findAvailablePortHost function doesn't await server.stop(), creating a race condition where the port may be returned before it's fully released by the OS.
Severity: HIGH

🔍 Detailed Analysis

In findAvailablePortHost, the call to server.stop() is not awaited. Since server.stop() is an asynchronous operation that returns a Promise, the function returns the port number before the temporary server has fully shut down and released the port to the operating system. This creates a race condition. If the subsequent code immediately tries to bind to this port, it may fail with an "address already in use" error, leading to intermittent failures when starting the OpenCode server.

💡 Suggested Fix

The server.stop() call should be awaited to ensure the port is fully released before the function returns. Change server.stop(); to await server.stop(); in the findAvailablePortHost function.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/session-manager/adapters/opencode.ts#L210-L217

Potential issue: In `findAvailablePortHost`, the call to `server.stop()` is not awaited.
Since `server.stop()` is an asynchronous operation that returns a Promise, the function
returns the port number before the temporary server has fully shut down and released the
port to the operating system. This creates a race condition. If the subsequent code
immediately tries to bind to this port, it may fail with an "address already in use"
error, leading to intermittent failures when starting the OpenCode server.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 8418176

}

private async isServerRunningHost(port: number): Promise<boolean> {
try {
const response = await fetch(`http://localhost:${port}/session`, { method: 'GET' });
return response.ok;
} catch {
return false;
}
}

async sendMessage(message: string): Promise<void> {
if (!this.containerName || !this.port) {
if (!this.port) {
const err = new Error('Adapter not started');
this.emitError(err);
throw err;
Expand Down Expand Up @@ -193,7 +264,24 @@ export class OpenCodeAdapter implements AgentAdapter {
}

private async createSession(baseUrl: string): Promise<string> {
const payload = this.model ? JSON.stringify({ model: this.model }) : '{}';
const payload = this.model ? { model: this.model } : {};

if (this.isHost) {
const response = await fetch(`${baseUrl}/session`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(MESSAGE_TIMEOUT_MS),
});

if (!response.ok) {
throw new Error(`Failed to create session: ${response.statusText}`);
}

const session = await response.json();
return session.id;
}

const result = await execInContainer(
this.containerName!,
[
Expand All @@ -208,7 +296,7 @@ export class OpenCodeAdapter implements AgentAdapter {
'-H',
'Content-Type: application/json',
'-d',
payload,
JSON.stringify(payload),
],
{ user: 'workspace' }
);
Expand All @@ -226,28 +314,42 @@ export class OpenCodeAdapter implements AgentAdapter {

await new Promise((resolve) => setTimeout(resolve, 100));

const payload = JSON.stringify({ parts: [{ type: 'text', text: message }] });
const result = await execInContainer(
this.containerName!,
[
'curl',
'-s',
'-f',
'--max-time',
String(MESSAGE_TIMEOUT_MS / 1000),
'-X',
'POST',
`${baseUrl}/session/${this.agentSessionId}/message`,
'-H',
'Content-Type: application/json',
'-d',
payload,
],
{ user: 'workspace' }
);
const payload = { parts: [{ type: 'text', text: message }] };

if (result.exitCode !== 0) {
throw new Error(`Failed to send message: ${result.stderr || 'Connection failed'}`);
if (this.isHost) {
const response = await fetch(`${baseUrl}/session/${this.agentSessionId}/message`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(MESSAGE_TIMEOUT_MS),
});

if (!response.ok) {
throw new Error(`Failed to send message: ${response.statusText}`);
}
} else {
const result = await execInContainer(
this.containerName!,
[
'curl',
'-s',
'-f',
'--max-time',
String(MESSAGE_TIMEOUT_MS / 1000),
'-X',
'POST',
`${baseUrl}/session/${this.agentSessionId}/message`,
'-H',
'Content-Type: application/json',
'-d',
JSON.stringify(payload),
],
{ user: 'workspace' }
);

if (result.exitCode !== 0) {
throw new Error(`Failed to send message: ${result.stderr || 'Connection failed'}`);
}
}

await sseReady;
Expand All @@ -259,21 +361,20 @@ export class OpenCodeAdapter implements AgentAdapter {
let resolved = false;
let receivedIdle = false;

const proc = Bun.spawn(
[
'docker',
'exec',
'-i',
this.containerName!,
'curl',
'-s',
'-N',
'--max-time',
String(SSE_TIMEOUT_MS / 1000),
`http://localhost:${this.port}/event`,
],
{ stdin: 'ignore', stdout: 'pipe', stderr: 'pipe' }
);
const curlArgs = [
'curl',
'-s',
'-N',
'--max-time',
String(SSE_TIMEOUT_MS / 1000),
`http://localhost:${this.port}/event`,
];

const spawnArgs = this.isHost
? curlArgs
: ['docker', 'exec', '-i', this.containerName!, ...curlArgs];

const proc = Bun.spawn(spawnArgs, { stdin: 'ignore', stdout: 'pipe', stderr: 'pipe' });

this.sseProcess = proc;
const decoder = new TextDecoder();
Expand Down