Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/vs/platform/agentHost/node/agentHostChangesetCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ export class ChangesetSessionCoordinator extends Disposable {
}
}

/**
* Restores the parent session when `resource` is a changeset URI and the
* parent session is not already live. Non-changeset URIs are ignored.
*
* This is intentionally narrower than {@link tryHandleSubscribe}: it does
* not compute per-turn / compare changesets and does not register static
* changesets. It exists for the AgentService subscribe path where
* `addSubscriber` may have already created a placeholder changeset snapshot
* before the parent session restore had a chance to apply persisted diffs.
*/
async restoreSessionIfChangesetSubscription(resource: URI, restoreSession: (session: URI) => Promise<void>): Promise<void> {
const resourceStr = resource.toString();
const parsed = parseChangesetUri(resourceStr);
if (!parsed) {
return;
}
if (parsed.kind === ChangesetKind.Unknown) {
throw new Error(`Cannot subscribe to unknown changeset resource: ${resourceStr}`);
}
if (!this._stateManager.getSessionState(parsed.sessionUri)) {
await restoreSession(URI.parse(parsed.sessionUri));
}
}

/**
* If `resource` is a known changeset URI (uncommitted / session /
* turn), seeds its state on the state manager and returns `true`.
Expand All @@ -251,9 +275,7 @@ export class ChangesetSessionCoordinator extends Disposable {
if (parsed.kind === ChangesetKind.Unknown) {
throw new Error(`Cannot subscribe to unknown changeset resource: ${resourceStr}`);
}
if (!this._stateManager.getSessionState(parsed.sessionUri)) {
await restoreSession(URI.parse(parsed.sessionUri));
}
await this.restoreSessionIfChangesetSubscription(resource, restoreSession);
if (parsed.kind === ChangesetKind.Turn && parsed.turnId) {
await this._changesets.computeTurnChangeset(parsed.sessionUri, parsed.turnId);
} else if (parsed.kind === ChangesetKind.Compare && parsed.originalTurnId && parsed.modifiedTurnId) {
Expand Down Expand Up @@ -333,15 +355,15 @@ export class ChangesetSessionCoordinator extends Disposable {
if (uncommittedRaw === undefined && sessionRaw === undefined && legacyRaw === undefined) {
return entry;
}
const restored = this._changesets.restorePersistedStaticChangesets(sessionStr, {
const restored = this._changesets.parsePersistedStaticChangesets(sessionStr, {
uncommittedRaw,
sessionRaw,
legacyRaw,
});
// `restorePersistedStaticChangesets` seeds the state manager; the
// catalogue itself is built here for unopened sessions only. Once
// the session is opened via `restoreSession`, the live overlay in
// `AgentService.listSessions` replaces this.
// `listSessions` must not seed full changeset state for every row;
// it only parses persisted blobs enough to render catalogue counts.
// Once the session is opened via `restoreSession`, the live overlay in
// `AgentService.listSessions` replaces this parse-only catalogue.
if (!liveSessionState) {
const catalogue = buildCatalogueFromPersistedDiffs(sessionStr, restored.uncommitted, restored.session);
if (catalogue) {
Expand Down
69 changes: 55 additions & 14 deletions src/vs/platform/agentHost/node/agentHostChangesetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,45 @@ export interface IAgentHostChangesetService {

/**
* Parses the persisted changeset metadata blobs (`uncommitted`,
* `session`, and the legacy `diffs` fallback for `session`), applies
* each parsed file list via {@link restoreStaticChangeset}, and returns
* the parsed diffs so the caller can pass them into
* {@link buildCatalogueFromPersistedDiffs} for the session-list overlay.
* `session`, and the legacy `diffs` fallback for `session`) without
* mutating live state. Intended for list overlays that only need
* aggregate catalogue counts and should not pin full changeset state in
* memory.
*/
parsePersistedStaticChangesets(sessionUri: ProtocolURI, metadata: IPersistedChangesetMetadata): IRestoredChangesetDiffs;

/**
* Applies parsed persisted changeset diffs to live state via
* {@link restoreStaticChangeset}. This is the side-effectful half of
* persisted restore and should only be used on real restore/subscribe
* paths that need a subscribable changeset snapshot.
*
* Honours `seedIfEmpty`: when a live changeset state already has files
* for the same kind, persisted diffs are NOT applied (they would
* otherwise overwrite the live state).
*/
applyPersistedStaticChangesets(sessionUri: ProtocolURI, diffs: IRestoredChangesetDiffs): void;

/**
* Compatibility wrapper that parses persisted changeset metadata and then
* applies it to live state. New list-overlay callers should prefer
* {@link parsePersistedStaticChangesets}; restore/subscribe callers can
* use this method when they intentionally want both parse and seed.
*
* The `AgentService` orchestration boundary batches the metadata read
* (custom title + read / archive flags + config values + these three
* blobs) in a single database round-trip, then hands the raw values
* here; the service does not open the database itself for this method.
*
* Honours `seedIfEmpty`: when a live changeset state already has files
* for the same kind, persisted diffs are NOT applied (they would
* otherwise overwrite the live state). Malformed JSON is logged and
* the corresponding slot is left `undefined`.
*/
restorePersistedStaticChangesets(sessionUri: ProtocolURI, metadata: IPersistedChangesetMetadata): IRestoredChangesetDiffs;

/**
* Returns true when the static changeset identified by `changesetUri` is
* currently being recomputed. Used by cache eviction to avoid dropping a
* slot while its producer is mid-flight.
*/
isStaticChangesetComputeActive(changesetUri: ProtocolURI): boolean;

/**
* Lazy refresh of the uncommitted changeset, kicked off when a client
* first subscribes to `<session>/changeset/uncommitted`.
Expand Down Expand Up @@ -330,6 +352,7 @@ export class AgentHostChangesetService extends Disposable implements IAgentHostC
private readonly _debouncedDiffTimers = this._register(new DisposableMap<string>());
/** Per-`(session, turnId)` debounce timers for mid-turn per-turn changeset recomputation. */
private readonly _perTurnDebouncedDiffTimers = this._register(new DisposableMap<string>());
private readonly _activeStaticComputes = new Set<ProtocolURI>();
private static readonly _DIFF_DEBOUNCE_MS = 5000;

/**
Expand Down Expand Up @@ -371,22 +394,34 @@ export class AgentHostChangesetService extends Disposable implements IAgentHostC
this._publishChangesetDiffs(session, changesetUri, diffs);
}

restorePersistedStaticChangesets(sessionUri: ProtocolURI, metadata: IPersistedChangesetMetadata): IRestoredChangesetDiffs {
parsePersistedStaticChangesets(sessionUri: ProtocolURI, metadata: IPersistedChangesetMetadata): IRestoredChangesetDiffs {
const persistedUncommitted = tryParsePersistedDiffs(metadata.uncommittedRaw, sessionUri, 'uncommitted', this._logService);
// Legacy `diffs` is the migration fallback for the session-wide
// changeset only — it never carried uncommitted state.
const persistedSession = tryParsePersistedDiffs(metadata.sessionRaw, sessionUri, 'session', this._logService)
?? tryParsePersistedDiffs(metadata.legacyRaw, sessionUri, 'session (legacy)', this._logService);

return { uncommitted: persistedUncommitted, session: persistedSession };
}

applyPersistedStaticChangesets(sessionUri: ProtocolURI, diffs: IRestoredChangesetDiffs): void {
// `seedIfEmpty`: only reseed persisted diffs when the matching live
// changeset state is absent or empty. Live state (e.g. from a prior
// refresh in this lifetime) is always more authoritative than a
// potentially-stale persisted blob; without this guard a fresh
// `restorePersistedStaticChangesets` call would clobber it.
this._seedIfEmpty(sessionUri, 'uncommitted', persistedUncommitted);
this._seedIfEmpty(sessionUri, 'session', persistedSession);
this._seedIfEmpty(sessionUri, 'uncommitted', diffs.uncommitted);
this._seedIfEmpty(sessionUri, 'session', diffs.session);
}

return { uncommitted: persistedUncommitted, session: persistedSession };
restorePersistedStaticChangesets(sessionUri: ProtocolURI, metadata: IPersistedChangesetMetadata): IRestoredChangesetDiffs {
const parsed = this.parsePersistedStaticChangesets(sessionUri, metadata);
this.applyPersistedStaticChangesets(sessionUri, parsed);
return parsed;
}

isStaticChangesetComputeActive(changesetUri: ProtocolURI): boolean {
return this._activeStaticComputes.has(changesetUri);
}

private _seedIfEmpty(session: ProtocolURI, kind: StaticChangesetKind, diffs: readonly ISessionFileDiff[] | undefined): void {
Expand Down Expand Up @@ -662,14 +697,18 @@ export class AgentHostChangesetService extends Disposable implements IAgentHostC
}

private async _doComputeStaticChangeset(session: ProtocolURI, kind: StaticChangesetKind, changedTurnId?: string): Promise<void> {
const changesetUri = staticChangesetUri(session, kind);
this._activeStaticComputes.add(changesetUri);
let ref: ReturnType<ISessionDataService['openDatabase']>;
try {
ref = this._sessionDataService.openDatabase(URI.parse(session));
} catch (err) {
this._logService.warn(`[AgentHostChangesetService] Failed to open session database for ${kind} diff computation: ${session}`, err);
this._activeStaticComputes.delete(changesetUri);
this._stateManager.onChangesetLivenessChanged();
return;
}
const changesetUri = this._stateManager.registerChangeset(staticChangesetUri(session, kind));
this._stateManager.registerChangeset(changesetUri);
try {
let diffs = await this._tryComputeGitDiffs(session, ref.object, kind);
if (!diffs) {
Expand Down Expand Up @@ -717,6 +756,8 @@ export class AgentHostChangesetService extends Disposable implements IAgentHostC
error: { errorType: 'computeFailed', message: err instanceof Error ? err.message : String(err) },
});
} finally {
this._activeStaticComputes.delete(changesetUri);
this._stateManager.onChangesetLivenessChanged();
ref.dispose();
}
}
Expand Down
126 changes: 126 additions & 0 deletions src/vs/platform/agentHost/node/agentHostChangesetStateCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { LinkedMap, Touch } from '../../../base/common/map.js';
import { ChangesetStatus, type ChangesetState, type URI } from '../common/state/sessionState.js';

/**
* Default number of expanded changeset states kept hot in memory.
*
* This cache only stores the subscribable `ChangesetState` payloads. The
* lightweight catalogue on `SessionSummary.changesets` remains on the session
* summary, and static changesets can be rehydrated from persisted metadata or
* recomputed on demand. The limit is intentionally a soft cap: subscribed or
* actively-computing changesets may pin the cache above this value until they
* become evictable.
*/
const DEFAULT_CHANGESET_STATE_SOFT_LIMIT = 500;

export interface IAgentHostChangesetStateRetentionOptions {
/**
* Number of expanded changeset states kept hot in memory. The limit is soft:
* entries for which {@link canEvict} returns false may temporarily keep the
* cache above this value.
*/
readonly softLimit?: number;

/**
* Returns whether a changeset state can be silently evicted from the cache.
* Production callers should provide this from `AgentService`, which owns
* protocol subscription refcounts and can ask the changeset service about
* active producers. Return false for changesets that are subscribed or have
* an active producer that may still publish into the changeset URI.
*/
readonly canEvict?: (changeset: URI) => boolean;
}

/**
* Owns the memory policy for expanded changeset states.
*
* The state manager owns protocol sequencing and reducer application; this
* helper owns the cache mechanics needed to keep dormant changesets bounded.
* Eviction here is deliberately silent: protocol-visible teardown still goes
* through `AgentHostStateManager.disposeChangeset`, which emits
* `ChangesetCleared` before removing state.
*/
export class AgentHostChangesetStateCache {

private readonly _states = new Map<string, ChangesetState>();
private readonly _lru = new LinkedMap<string, true>();
private readonly _softLimit: number;
private readonly _canEvict: (changeset: URI) => boolean;

constructor(options: IAgentHostChangesetStateRetentionOptions = {}) {
this._softLimit = Math.max(0, options.softLimit ?? DEFAULT_CHANGESET_STATE_SOFT_LIMIT);
this._canEvict = options.canEvict ?? (() => true);
}

keys(): IterableIterator<string> {
return this._states.keys();
}

has(changeset: URI): boolean {
return this._states.has(changeset);
}

get(changeset: URI): ChangesetState | undefined {
this._touch(changeset);
return this._states.get(changeset);
}

set(changeset: URI, state: ChangesetState): void {
this._states.set(changeset, state);
this._touch(changeset);
this._evictIfOverLimit();
}

delete(changeset: URI): void {
this._states.delete(changeset);
this._lru.delete(changeset);
}

register(changeset: URI, initialStatus: ChangesetStatus = ChangesetStatus.Computing): void {
if (this._states.has(changeset)) {
this._touch(changeset);
return;
}
this.set(changeset, { status: initialStatus, files: [] });
}

/** Re-runs eviction after external liveness changes, such as unsubscribe or compute completion. */
trimEvictableEntries(): void {
this._evictIfOverLimit();
}

private _touch(changeset: URI): void {
if (this._states.has(changeset)) {
this._lru.set(changeset, true, Touch.AsNew);
}
}

private _evictIfOverLimit(): void {
if (this._softLimit === 0) {
for (const changeset of [...this._lru.keys()]) {
if (this._canEvict(changeset)) {
this.delete(changeset);
}
}
return;
}

for (const changeset of [...this._lru.keys()]) {
if (this._states.size <= this._softLimit) {
return;
}
if (!this._states.has(changeset)) {
this._lru.delete(changeset);
continue;
}
if (this._canEvict(changeset)) {
this.delete(changeset);
}
}
}
}
Loading
Loading