Skip to content

Commit

Permalink
Migrate IPC to tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
ha-D committed Oct 1, 2016
1 parent ce26910 commit b194016
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 72 deletions.
6 changes: 3 additions & 3 deletions cachebrowser/api/handlers/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


def close(request):
def close(context, request):
import os
import signal

Expand All @@ -10,5 +10,5 @@ def close(request):
os._exit(0)


def ping(request):
request.reply({'afd': 12})
def ping(context, request):
request.reply("pong")
255 changes: 188 additions & 67 deletions cachebrowser/ipc.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,222 @@
import json
from threading import Thread
import traceback
import uuid
import logging

from websocket_server import WebsocketServer
import tornado.ioloop
import tornado.web
import tornado.websocket

from cachebrowser.api.core import api_manager, APIRequest

logger = logging.getLogger(__name__)

class IPCManager(object):
def __init__(self, context):
self.context = context

self.websocket_server = IPCWebSocket(self, context.settings.ipc_port)
self.websocket_server.start()
class IPCRouter(object):
def __init__(self):
self.clients = {}
self.channels = {}
self.rpc_clients = {}
self.rpc_pending_requests = {}

self.subscribtions = {}
def add_client(self, client_id, client):
self.clients[client_id] = client

api_manager.register_api('/subscribe', self.on_subscribe)
api_manager.register_api('/unsubscribe', self.on_unsubscribe)
def remove_client(self, client_id):
if client_id in self.clients:
del self.clients[client_id]

def publish(self, channel, message):
mes = {
'channel': channel,
'data': message,
'message_type': 'publish'
}
self.websocket_server.broadcast(mes)

def on_message(self, client, message):
route = message['route']
params = message.get('params', {})
req_id = message['request_id']

request = IPCRequest(req_id, self, client, route, params)
api_manager.handle_api_request(request)

def send_response(self, request, response):
self.websocket_server.send(request.client, {
'message_type': 'response',
'request_id': request.id,
'response': response
})
if channel not in self.channels:
return

dangling_clients = []

for client_id in self.channels[channel]:
if client_id not in self.clients:
dangling_clients.append(client_id)
else:
self.clients[client_id].send_publish(channel, message)

for client_id in dangling_clients:
self.channels[channel].remove(client_id)

def subscribe(self, client_id, channel):
if channel not in self.channels:
self.channels[channel] = set()
self.channels[channel].add(client_id)

def unsubscribe(self, client_id, channel):
if channel in self.channels and client_id in self.channels[channel]:
self.channels[channel].remove(client_id)

def rpc_request(self, client_id, request_id, method, params):
if method not in self.rpc_clients:
# TODO Give error
pass

self.rpc_pending_requests[request_id] = client_id
target_client_id = self.rpc_clients.get(method, None)
if target_client_id is not None:
client = self.clients.get(target_client_id, None)
if client is not None:
client.send_rpc_request(request_id, method, params)

# TODO give error if route doesn't exist

def rpc_response(self, request_id, response):
client_id = self.rpc_pending_requests.pop(request_id)
client = self.clients.get(client_id, None)
if client is not None:
client.send_rpc_response(request_id, response)

def register_rpc(self, client_id, method):
# TODO give error if already registered
self.rpc_clients[method] = client_id


class IPCClient(object):
"""
General rule: these methods shouldn't need to interact with the router. They are called from the router
"""
def send_publish(self, channel, message):
pass

def on_subscribe(self, request):
# TODO Manage subcription to avoid redundant publishing
def send_rpc_request(self, request_id, method, params):
pass

def on_unsubscribe(self, request):
def send_rpc_response(self, request_id, response):
pass


class IPCWebSocket(object):
def __init__(self, ipc, port):
self.ipc = ipc
self.server = WebsocketServer(port)
self._set_callbacks(self.server)
class IPCManager(IPCClient):
def __init__(self, context):
self.context = context

self.router = IPCRouter()

self.id = 'local_client'
self.router.add_client(self.id, self)

self.clients = []
self.websocket = self.initialize_websocket_server(context.settings.ipc_port)

def start(self):
from threading import Thread
def initialize_websocket_server(self, port):
app = tornado.web.Application([
(r'/', WebSocketIPCClient, {'router': self.router}),
])

def run():
self.server.run_forever()
t = Thread(target=run)
def run_loop():
logger.debug("Starting IPC websocket server on port {}".format(port))
ioloop = tornado.ioloop.IOLoop()
ioloop.make_current()
app.listen(port)
ioloop.start()

t = Thread(target=run_loop)
t.daemon = True
t.start()

def _set_callbacks(self, server):
server.set_fn_new_client(self.on_connect)
server.set_fn_client_left(self.on_disconnect)
server.set_fn_message_received(self.on_message)
return app

def on_connect(self, client, server):
self.clients.append(client)
def publish(self, channel, message):
self.router.publish(channel, message)

def on_disconnect(self, client, server):
self.clients = [c for c in self.clients if c['id'] != client['id']]
def subscribe(self, channel, callback):
raise NotImplementedError("Local subscriptions not implemented")

def on_message(self, client, server, message):
message = json.loads(message)
self.ipc.on_message(client, message)
def register_rpc(self, method, handler):
logger.debug("Registering RPC method {}".format(method))
self.router.register_rpc(self.id, method)

def broadcast(self, message):
message = json.dumps(message)
for c in self.clients:
self.server.send_message(c, message)
def register_rpc_handlers(self, routes):
for route in routes:
self.register_rpc(route[0], route[1])

def send(self, client, message):
self.server.send_message(client, json.dumps(message))
def send_publish(self, channel, message):
raise NotImplementedError("Local subscriptions not implemented")

def send_rpc_request(self, request_id, method, params):
request = RPCRequest(self.router, request_id, method, params)
api_manager.handle_api_request(self.context, request)

class IPCRequest(APIRequest):
def __init__(self, request_id, ipc, client, route, params):
def send_rpc_response(self, request_id, response):
raise NotImplementedError()


class RPCRequest(APIRequest):
def __init__(self, router, request_id, route, params):
self.id = request_id
self.ipc = ipc
self.client = client
self.router = router

super(IPCRequest, self).__init__(route, params)
super(RPCRequest, self).__init__(route, params)

def reply(self, response=None):
self.ipc.send_response(self, response)
self.router.rpc_response(self.id, response)


class WebSocketIPCClient(tornado.websocket.WebSocketHandler, IPCClient):
def initialize(self, router=None):
self.id = str(uuid.uuid4())[:8]
self.router = router

def open(self, *args, **kwargs):
self.router.add_client(self.id, self)

def on_close(self):
self.router.remove_client(self.id)

def on_message(self, message):
try:
json_message = json.loads(message)
except ValueError:
logger.error("IPC: received invalid json message:\n{}".format(message))
else:
self.handle_message(json_message)

def handle_message(self, message):
message_type = message.get('type', None)

if message_type is None:
logger.error("IPC: received message with no type")

try:
if message_type == 'pub':
self.router.publish(message['channel'], message['message'])
elif message_type == 'sub':
self.router.subscribe(self.id, message['channel'])
elif message_type == 'unsub':
self.router.unsubscribe(self.id, message['channel'])
elif message_type == 'rpc_req':
self.router.rpc_request(self.id, message['request_id'], message['method'], message.get('params', {}))
except Exception as e:
logger.error("Uncaught exception occurred while handling IPC message: {}".format(message))
traceback.print_exc()

def send_publish(self, channel, message):
self.send({
'type': 'pub',
'channel': channel,
'message': message
})

def send_rpc_request(self, request_id, method, params):
self.send({
'type': 'rpc_req',
'request_id': request_id,
'method': method,
'params': params
})

def send_rpc_response(self, request_id, response):
self.send({
'type': 'rpc_resp',
'request_id': request_id,
'message': response
})

def send(self, message):
self.write_message(message)

def check_origin(self, origin):
return True
2 changes: 2 additions & 0 deletions cachebrowser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cachebrowser.pipes.sni import SNIPipe
from cachebrowser.settings import DevelopmentSettings, ProductionSettings, SettingsValidationError
from cachebrowser.ipc import IPCManager
from cachebrowser.api.routes import routes as api_routes
from cachebrowser import cli

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,6 +84,7 @@ def cachebrowser(click_context, config, verbose, reset_db, dev, **kwargs):
def start_cachebrowser_server(context):
logger.debug("Initializing IPC")
ipc = IPCManager(context)
ipc.register_rpc_handlers(api_routes)

config = mitmproxy.proxy.ProxyConfig(port=context.settings.port)
server = ProxyServer(config)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mitmproxy==0.17
click==6.6
peewee==2.8.1
ipwhois==0.13.0
websocket_server==0.4
appdirs==1.4.0
PyYAML==3.11
tornado==4.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
'click>=6.6',
'PyYAML',
'peewee',
'websocket-server',
'tornado',
'appdirs',
'ipwhois',
],
Expand Down

0 comments on commit b194016

Please sign in to comment.