Skip to content

Commit

Permalink
Introduce sendMessage websocket api which allows for callbacks (#166)
Browse files Browse the repository at this point in the history
* Introduce sendMessage websocket api which allows for callbacks, deprecate lastPingId in favor of callbacks

* Let InspectTrafficPlugin handle all pushed inspection events
  • Loading branch information
abhinavsingh committed Nov 12, 2019
1 parent d20cf1c commit 7ca7c2d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 25 deletions.
12 changes: 8 additions & 4 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import threading
import multiprocessing
import uuid
from typing import List, Tuple, Optional, Any
from typing import List, Tuple, Optional, Any, Dict

from proxy.http.server import HttpWebServerPlugin, HttpWebServerBasePlugin, httpProtocolTypes
from proxy.http.parser import HttpParser
Expand Down Expand Up @@ -86,7 +86,7 @@ def on_websocket_message(self, frame: WebsocketFrame) -> None:
return

if message['method'] == 'ping':
self.reply_pong(message['id'])
self.reply({'id': message['id'], 'response': 'pong'})
elif message['method'] == 'enable_inspection':
# inspection can only be enabled if --enable-events is used
if not self.flags.enable_events:
Expand All @@ -112,10 +112,13 @@ def on_websocket_message(self, frame: WebsocketFrame) -> None:
self.relay_sub_id = uuid.uuid4().hex
self.event_queue.subscribe(
self.relay_sub_id, self.relay_channel)

self.reply({'id': message['id'], 'response': 'inspection_enabled'})
elif message['method'] == 'disable_inspection':
if self.inspection_enabled:
self.shutdown_relay()
self.inspection_enabled = False
self.reply({'id': message['id'], 'response': 'inspection_disabled'})
else:
logger.info(frame.data)
logger.info(frame.opcode)
Expand All @@ -140,11 +143,11 @@ def on_websocket_close(self) -> None:
if self.inspection_enabled:
self.shutdown_relay()

def reply_pong(self, idd: int) -> None:
def reply(self, data: Dict[str, Any]) -> None:
self.client.queue(
WebsocketFrame.text(
bytes_(
json.dumps({'id': idd, 'response': 'pong'}))))
json.dumps(data))))

@staticmethod
def relay_events(
Expand All @@ -154,6 +157,7 @@ def relay_events(
while not shutdown.is_set():
try:
ev = channel.get(timeout=1)
ev['push'] = 'inspect_traffic'
client.queue(
WebsocketFrame.text(
bytes_(
Expand Down
6 changes: 5 additions & 1 deletion dashboard/src/core/plugins/inspect_traffic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ export class InspectTrafficPlugin extends DashboardPlugin {
}

public activated (): void {
this.websocketApi.enableInspection()
this.websocketApi.enableInspection(this.handleEvents.bind(this))
}

public deactivated (): void {
this.websocketApi.disableInspection()
}

public handleEvents (message: Record<string, any>): void {
console.log(message)
}
}
55 changes: 36 additions & 19 deletions dashboard/src/core/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
:license: BSD, see LICENSE for more details.
*/

type MessageHandler = (message: Record<string, any>) => void

export class WebsocketApi {
private hostname: string = window.location.hostname ? window.location.hostname : 'localhost';
private port: number = window.location.port ? Number(window.location.port) : 8899;
Expand All @@ -18,16 +20,17 @@ export class WebsocketApi {
private wsPath: string = this.wsScheme + '://' + this.hostname + ':' + this.port + this.wsPrefix;

private mid: number = 0;
private lastPingId: number;
private lastPingTime: number;

private readonly schedulePingEveryMs: number = 1000;
private readonly scheduleReconnectEveryMs: number = 5000;

private serverPingTimer: number;
private serverConnectTimer: number;
private serverPingTimer: number = null;
private serverConnectTimer: number = null;

private inspectionEnabled: boolean;
private inspectionEnabled: boolean = false;
private inspectionCallback: MessageHandler = null;
private callbacks: Map<number, MessageHandler> = new Map()

constructor () {
this.scheduleServerConnect(0)
Expand All @@ -38,17 +41,17 @@ export class WebsocketApi {
return date.getTime()
}

public enableInspection () {
public enableInspection (eventCallback?: MessageHandler) {
// TODO: Set flag to true only once response has been received from the server
this.inspectionEnabled = true
this.ws.send(JSON.stringify({ id: this.mid, method: 'enable_inspection' }))
this.mid++
this.inspectionCallback = eventCallback
this.sendMessage({ method: 'enable_inspection' })
}

public disableInspection () {
this.inspectionEnabled = false
this.ws.send(JSON.stringify({ id: this.mid, method: 'disable_inspection' }))
this.mid++
this.inspectionCallback = null
this.sendMessage({ method: 'disable_inspection' })
}

private scheduleServerConnect (after_ms: number = this.scheduleReconnectEveryMs) {
Expand Down Expand Up @@ -80,11 +83,16 @@ export class WebsocketApi {
}

private pingServer () {
this.lastPingId = this.mid
this.lastPingTime = WebsocketApi.getTime()
this.mid++
// console.log('Pinging server with id:%d', this.last_ping_id);
this.ws.send(JSON.stringify({ id: this.lastPingId, method: 'ping' }))
this.sendMessage({ method: 'ping' }, this.handlePong.bind(this))
}

private handlePong (message: Record<string, any>) {
WebsocketApi.setServerStatusSuccess(
String((WebsocketApi.getTime() - this.lastPingTime) + ' ms'))
this.clearServerPingTimer()
this.scheduleServerPing()
}

private clearServerPingTimer () {
Expand All @@ -93,7 +101,6 @@ export class WebsocketApi {
this.serverPingTimer = null
}
this.lastPingTime = null
this.lastPingId = null
}

private onServerWSOpen (ev: MessageEvent) {
Expand All @@ -102,13 +109,23 @@ export class WebsocketApi {
this.scheduleServerPing(0)
}

public sendMessage (data: Record<string, any>, callback?: MessageHandler) {
data.id = this.mid
if (callback) {
this.callbacks.set(this.mid, callback)
}
this.mid++
this.ws.send(JSON.stringify(data))
}

private onServerWSMessage (ev: MessageEvent) {
const message = JSON.parse(ev.data)
if (message.id === this.lastPingId) {
WebsocketApi.setServerStatusSuccess(
String((WebsocketApi.getTime() - this.lastPingTime) + ' ms'))
this.clearServerPingTimer()
this.scheduleServerPing()
const message: Record<string, any> = JSON.parse(ev.data)
if (message.push !== undefined && message.push === 'inspect_traffic' && this.inspectionCallback !== null) {
this.inspectionCallback(message)
} else if (this.callbacks.has(message.id)) {
const callback = this.callbacks.get(message.id)
this.callbacks.delete(message.id)
callback(message)
} else {
console.log(message)
}
Expand Down
2 changes: 1 addition & 1 deletion dashboard/src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class ProxyDashboard {
private static plugins: IPluginConstructor[] = [];
private plugins: Map<string, IDashboardPlugin> = new Map();

private websocketApi: WebsocketApi
private readonly websocketApi: WebsocketApi

constructor () {
this.websocketApi = new WebsocketApi()
Expand Down

0 comments on commit 7ca7c2d

Please sign in to comment.