Skip to content

Commit

Permalink
chore: dispose stale handles to prevent oom, 1000 of a kind max (micr…
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelfeldman committed Sep 26, 2023
1 parent 6181960 commit ffd20f4
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 18 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests_stress.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ jobs:
if: always()
- run: npm run stest browsers -- --project=firefox
if: always()
- run: npm run stest heap -- --project=chromium
if: always()
6 changes: 4 additions & 2 deletions packages/playwright-core/src/client/channelOwner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export abstract class ChannelOwner<T extends channels.Channel = channels.Channel
_logger: Logger | undefined;
readonly _instrumentation: ClientInstrumentation;
private _eventToSubscriptionMapping: Map<string, string> = new Map();
_wasCollected: boolean = false;

constructor(parent: ChannelOwner | Connection, type: string, guid: string, initializer: channels.InitializerTraits<T>) {
super();
Expand Down Expand Up @@ -114,15 +115,16 @@ export abstract class ChannelOwner<T extends channels.Channel = channels.Channel
child._parent = this;
}

_dispose() {
_dispose(reason: 'gc' | undefined) {
// Clean up from parent and connection.
if (this._parent)
this._parent._objects.delete(this._guid);
this._connection._objects.delete(this._guid);
this._wasCollected = reason === 'gc';

// Dispose all children.
for (const object of [...this._objects.values()])
object._dispose();
object._dispose(reason);
this._objects.clear();
}

Expand Down
4 changes: 3 additions & 1 deletion packages/playwright-core/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ export class Connection extends EventEmitter {
async sendMessageToServer(object: ChannelOwner, type: string, method: string, params: any, stackTrace: ParsedStackTrace | null, wallTime: number | undefined): Promise<any> {
if (this._closedErrorMessage)
throw new Error(this._closedErrorMessage);
if (object._wasCollected)
throw new Error('The object has been collected to prevent unbounded heap growth.');

const { apiName, frames } = stackTrace || { apiName: '', frames: [] };
const guid = object._guid;
Expand Down Expand Up @@ -170,7 +172,7 @@ export class Connection extends EventEmitter {
}

if (method === '__dispose__') {
object._dispose();
object._dispose(params.reason);
return;
}

Expand Down
60 changes: 47 additions & 13 deletions packages/playwright-core/src/server/dispatchers/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export function existingDispatcher<DispatcherType>(object: any): DispatcherType
return object[dispatcherSymbol];
}

let maxDispatchers = 1000;
export function setMaxDispatchersForTest(value: number | undefined) {
maxDispatchers = value || 1000;
}

export class Dispatcher<Type extends { guid: string }, ChannelType, ParentScopeType extends DispatcherScope> extends EventEmitter implements channels.Channel {
private _connection: DispatcherConnection;
// Parent is always "isScope".
Expand All @@ -55,18 +60,18 @@ export class Dispatcher<Type extends { guid: string }, ChannelType, ParentScopeT
this._parent = parent instanceof DispatcherConnection ? undefined : parent;

const guid = object.guid;
assert(!this._connection._dispatchers.has(guid));
this._connection._dispatchers.set(guid, this);
this._guid = guid;
this._type = type;
this._object = object;

(object as any)[dispatcherSymbol] = this;

this._connection.registerDispatcher(this);
if (this._parent) {
assert(!this._parent._dispatchers.has(guid));
this._parent._dispatchers.set(guid, this);
}

this._type = type;
this._guid = guid;
this._object = object;

(object as any)[dispatcherSymbol] = this;
if (this._parent)
this._connection.sendCreate(this._parent, type, guid, initializer, this._parent._object);
}
Expand Down Expand Up @@ -100,9 +105,9 @@ export class Dispatcher<Type extends { guid: string }, ChannelType, ParentScopeT
this._connection.sendEvent(this, method as string, params, sdkObject);
}

_dispose() {
_dispose(reason?: 'gc') {
this._disposeRecursively();
this._connection.sendDispose(this);
this._connection.sendDispose(this, reason);
}

protected _onDispose() {
Expand All @@ -115,8 +120,9 @@ export class Dispatcher<Type extends { guid: string }, ChannelType, ParentScopeT
eventsHelper.removeEventListeners(this._eventListeners);

// Clean up from parent and connection.
if (this._parent)
this._parent._dispatchers.delete(this._guid);
this._parent?._dispatchers.delete(this._guid);
const list = this._connection._dispatchersByType.get(this._type);
list?.delete(this._guid);
this._connection._dispatchers.delete(this._guid);

// Dispose all children.
Expand Down Expand Up @@ -159,6 +165,8 @@ export class RootDispatcher extends Dispatcher<{ guid: '' }, any, any> {

export class DispatcherConnection {
readonly _dispatchers = new Map<string, DispatcherScope>();
// Collect stale dispatchers by type.
readonly _dispatchersByType = new Map<string, Set<string>>();
onmessage = (message: object) => {};
private _waitOperations = new Map<string, CallMetadata>();
private _isLocal: boolean;
Expand All @@ -183,8 +191,8 @@ export class DispatcherConnection {
this._sendMessageToClient(parent._guid, dispatcher._type, '__adopt__', { guid: dispatcher._guid });
}

sendDispose(dispatcher: DispatcherScope) {
this._sendMessageToClient(dispatcher._guid, dispatcher._type, '__dispose__', {});
sendDispose(dispatcher: DispatcherScope, reason?: 'gc') {
this._sendMessageToClient(dispatcher._guid, dispatcher._type, '__dispose__', { reason });
}

private _sendMessageToClient(guid: string, type: string, method: string, params: any, sdkObject?: SdkObject) {
Expand Down Expand Up @@ -224,6 +232,32 @@ export class DispatcherConnection {
throw new ValidationError(`${path}: expected dispatcher ${names.toString()}`);
}

registerDispatcher(dispatcher: DispatcherScope) {
assert(!this._dispatchers.has(dispatcher._guid));
this._dispatchers.set(dispatcher._guid, dispatcher);
const type = dispatcher._type;

let list = this._dispatchersByType.get(type);
if (!list) {
list = new Set();
this._dispatchersByType.set(type, list);
}
list.add(dispatcher._guid);
if (list.size > maxDispatchers)
this._disposeStaleDispatchers(type, list);
}

private _disposeStaleDispatchers(type: string, dispatchers: Set<string>) {
const dispatchersArray = [...dispatchers];
this._dispatchersByType.set(type, new Set(dispatchersArray.slice(maxDispatchers / 10)));
for (let i = 0; i < maxDispatchers / 10; ++i) {
const d = this._dispatchers.get(dispatchersArray[i]);
if (!d)
continue;
d._dispose('gc');
}
}

async dispatch(message: object) {
const { id, guid, method, params, metadata } = message as any;
const dispatcher = this._dispatchers.get(guid);
Expand Down
41 changes: 39 additions & 2 deletions tests/stress/heap.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { contextTest as test, expect } from '../config/browserTest';
import { queryObjectCount } from '../config/queryObjects';

test.describe.configure({ mode: 'serial' });
test.skip(({ browserName }) => browserName !== 'chromium');

for (let i = 0; i < 3; ++i) {
test(`test #${i} to request page and context`, async ({ page, context }) => {
Expand Down Expand Up @@ -65,7 +66,7 @@ test('should not leak dispatchers after closing page', async ({ context, server
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/page').Page)).toBe(COUNT);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').RequestDispatcher)).toBe(COUNT);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').ResponseDispatcher)).toBe(COUNT);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/consoleMessageDispatcher').ConsoleMessageDispatcher)).toBe(COUNT);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/console').ConsoleMessage)).toBe(0);

for (const page of pages)
await page.close();
Expand All @@ -74,10 +75,46 @@ test('should not leak dispatchers after closing page', async ({ context, server
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/page').Page)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').RequestDispatcher)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').ResponseDispatcher)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/consoleMessageDispatcher').ConsoleMessageDispatcher)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/console').ConsoleMessage)).toBe(0);

expect(await queryObjectCount(require('../../packages/playwright-core/lib/client/page').Page)).toBeLessThan(COUNT);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/server/page').Page)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/client/network').Request)).toBe(0);
expect(await queryObjectCount(require('../../packages/playwright-core/lib/client/network').Response)).toBe(0);
});

test.describe(() => {
test.beforeEach(() => {
require('../../packages/playwright-core/lib/server/dispatchers/dispatcher').setMaxDispatchersForTest(100);
});

test('should collect stale handles', async ({ page, server }) => {
page.on('request', () => {});
const response = await page.goto(server.PREFIX + '/title.html');
for (let i = 0; i < 200; ++i) {
await page.evaluate(async () => {
const response = await fetch('/');
await response.text();
});
}
const e = await response.allHeaders().catch(e => e);
expect(e.message).toContain('The object has been collected to prevent unbounded heap growth.');

const counts = [
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/client/network').Request), message: 'client.Request' },
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/client/network').Response), message: 'client.Response' },
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/server/network').Request), message: 'server.Request' },
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/server/network').Response), message: 'server.Response' },
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').RequestDispatcher), message: 'dispatchers.RequestDispatcher' },
{ count: await queryObjectCount(require('../../packages/playwright-core/lib/server/dispatchers/networkDispatchers').ResponseDispatcher), message: 'dispatchers.ResponseDispatcher' },
];
for (const { count, message } of counts) {
expect(count, { message }).toBeGreaterThan(50);
expect(count, { message }).toBeLessThan(150);
}
});

test.afterEach(() => {
require('../../packages/playwright-core/lib/server/dispatchers/dispatcher').setMaxDispatchersForTest(null);
});
});

0 comments on commit ffd20f4

Please sign in to comment.