Skip to content
This repository has been archived by the owner on Dec 13, 2018. It is now read-only.

Commit

Permalink
disposal #4: Refactor Socket class out of DebuggerConnection
Browse files Browse the repository at this point in the history
Summary:
DebuggerConnection was getting pretty gnarly, so this pulls out the nitty gritty websocket munging
logic into its own abstraction.

Differential Revision: D4505739

fbshipit-source-id: 2f413efd6b1ab68dec1c1a8ca0f26ee8290d16eb
  • Loading branch information
johnislarry authored and facebook-github-bot committed Feb 4, 2017
1 parent 0b31118 commit 71e849d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 61 deletions.
70 changes: 9 additions & 61 deletions pkg/nuclide-debugger-iwdp-rpc/lib/DebuggerConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@
*/

import UniversalDisposable from '../../commons-node/UniversalDisposable';
import WS from 'ws';
import {Observable, BehaviorSubject, Subject} from 'rxjs';
import {createWebSocketListener} from './createWebSocketListener';
import {logger} from './logger';
import {RUNNING, PAUSED, ENDED} from './constants';
import invariant from 'assert';
import {Socket} from './Socket';

import type {DeviceInfo, RuntimeStatus} from './types';
import type {AnyTeardown} from '../../commons-node/UniversalDisposable';

type Id = number;
type onResponseReceived = (response: Object) => void;

const {log} = logger;

/**
Expand All @@ -37,73 +32,30 @@ const {log} = logger;
* `subscribeToEvents` API, which accepts a callback called when events are emitted from the target.
*/
export class DebuggerConnection {
_webSocket: ?WS;
_webSocketPromise: Promise<WS>;
_disposables: UniversalDisposable;
_status: BehaviorSubject<RuntimeStatus>;
_pendingRequests: Map<Id, onResponseReceived>;
_id: number;
_events: Subject<Object>;
_connectionId: number;
_deviceInfo: DeviceInfo;
_webSocketClosed: boolean;
_socket: Socket;

constructor(connectionId: number, deviceInfo: DeviceInfo) {
this._deviceInfo = deviceInfo;
this._connectionId = connectionId;
this._webSocket = null;
this._events = new Subject();
this._id = 0;
this._pendingRequests = new Map();
this._webSocketClosed = false;
this._status = new BehaviorSubject(RUNNING);
const {webSocketDebuggerUrl} = deviceInfo;
const webSocket = new WS(webSocketDebuggerUrl);
// It's not enough to just construct the websocket -- we have to also wait for it to open.
this._webSocketPromise = new Promise(resolve => webSocket.on('open', () => resolve(webSocket)));
webSocket.on(
'close',
() => {
this._webSocketClosed = true;
this._status.next(ENDED);
},
);
const socketMessages: Observable<string> = createWebSocketListener(webSocket);
this._disposables = new UniversalDisposable(
() => {
if (!this._webSocketClosed) {
webSocket.close();
}
},
socketMessages.subscribe(message => this._handleSocketMessage(message)),
this._socket = new Socket(
webSocketDebuggerUrl,
this._handleChromeEvent.bind(this),
() => this._status.next(ENDED),
);
this._disposables = new UniversalDisposable(this._socket);
log(`DebuggerConnection created with device info: ${JSON.stringify(deviceInfo)}`);
}

async sendCommand(message: Object): Promise<Object> {
if (this._webSocket == null) {
this._webSocket = await this._webSocketPromise;
}
const webSocket = this._webSocket;
if (message.id == null) {
message.id = this._id++;
}
return new Promise(resolve => {
this._pendingRequests.set(message.id, resolve);
webSocket.send(JSON.stringify(message));
});
}

_handleSocketMessage(message: string): void {
const obj = JSON.parse(message);
if (isEvent(obj)) {
this._handleChromeEvent(obj);
} else {
const resolve = this._pendingRequests.get(obj.id);
invariant(resolve != null, `Got response for a request that wasn't sent: ${message}`);
this._pendingRequests.delete(obj.id);
resolve(obj);
}
sendCommand(message: Object): Promise<Object> {
return this._socket.sendCommand(message);
}

_handleChromeEvent(message: Object): void {
Expand Down Expand Up @@ -156,7 +108,3 @@ export class DebuggerConnection {
this._disposables.dispose();
}
}

function isEvent(obj: Object): boolean {
return obj.id == null;
}
96 changes: 96 additions & 0 deletions pkg/nuclide-debugger-iwdp-rpc/lib/Socket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the license found in the LICENSE file in
* the root directory of this source tree.
*
* @flow
*/

import UniversalDisposable from '../../commons-node/UniversalDisposable';
import WS from 'ws';
import {createWebSocketListener} from './createWebSocketListener';
import invariant from 'assert';

import type {Observable} from 'rxjs';

type Id = number;
type onResponseReceived = (response: Object) => void;

export class Socket {
_webSocket: ?WS;
_webSocketOpenPromise: Promise<WS>;
_disposables: UniversalDisposable;
_pendingRequests: Map<Id, onResponseReceived>;
_webSocketClosed: boolean;
_id: number;
_handleChromeEvent: (message: Object) => mixed;

constructor(
url: string,
handleChromeEvent: (message: Object) => mixed,
handleSocketEnd: () => mixed,
) {
this._id = 0;
this._handleChromeEvent = handleChromeEvent;
this._webSocket = null;
this._pendingRequests = new Map();
this._webSocketClosed = false;
const webSocket = new WS(url);
// It's not enough to just construct the websocket -- we have to also wait for it to open.
this._webSocketOpenPromise = new Promise(
resolve => webSocket.on('open', () => resolve(webSocket)),
);
webSocket.on(
'close',
() => {
this._webSocketClosed = true;
handleSocketEnd();
},
);
const socketMessages: Observable<string> = createWebSocketListener(webSocket);
this._disposables = new UniversalDisposable(
() => {
if (!this._webSocketClosed) {
webSocket.close();
}
},
socketMessages.subscribe(message => this._handleSocketMessage(message)),
);
}

async sendCommand(message: Object): Promise<Object> {
if (this._webSocket == null) {
this._webSocket = await this._webSocketOpenPromise;
}
const webSocket = this._webSocket;
if (message.id == null) {
message.id = this._id++;
}
return new Promise(resolve => {
this._pendingRequests.set(message.id, resolve);
webSocket.send(JSON.stringify(message));
});
}

_handleSocketMessage(message: string): void {
const obj = JSON.parse(message);
if (isEvent(obj)) {
this._handleChromeEvent(obj);
} else {
const resolve = this._pendingRequests.get(obj.id);
invariant(resolve != null, `Got response for a request that wasn't sent: ${message}`);
this._pendingRequests.delete(obj.id);
resolve(obj);
}
}

dispose(): void {
this._disposables.dispose();
}
}

function isEvent(obj: Object): boolean {
return obj.id == null;
}

0 comments on commit 71e849d

Please sign in to comment.