Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

refactoring

  • Loading branch information...
commit ffaec59eeb388cab841411d85b18fb1047c0f0b8 1 parent 59f80ec
@frutik authored
View
2  etc/supervisord.conf
@@ -5,7 +5,7 @@ user=nobody
[program:ws_rabbit]
directory=/opt/ucall/utils/ws-rabbit
-command=python /opt/ucall/utils/ws-rabbit/tornado-run.py
+command=python /opt/ucall/utils/ws-rabbit/tornado-run.py --port=@@stomp.ws_port@@
user=nobody
;[program:hornetq]
View
19 fabfile.py
@@ -36,20 +36,17 @@ def deploy():
def local_deploy():
_deploy(local)
-def update_requirements(cmd):
- cmd('pip install -r ' + web_dir + 'requirements1.txt --upgrade')
- cmd('cd ' + web_dir + ' && pip install -r ' + web_dir + 'requirements2.txt --upgrade')
+def update_requirements():
+ sudo('pip install -r ' + web_dir + 'requirements1.txt --upgrade')
-def update_requirements_remote():
- update_requirements(sudo)
-
-def update_requirements_local():
- update_requirements(local)
+def update_src_requirements():
+ sudo('cd ' + web_dir + ' && pip install -r ' + web_dir + 'requirements2.txt --upgrade')
def restore_configs():
sudo('cp /opt/etc/config,ini ' + web_dir + 'etc/')
def build_source():
+ sudo('rm -rf /tmp/ucall/')
local('ant prepare_3rd_parties copy_source fill_properties')
def build_tarball():
@@ -63,12 +60,10 @@ def all():
build_tarball()
upload_tarball()
deploy()
- update_requirements_remote()
+ update_requirements()
def local_all():
build_source()
- build_tarball()
- local_deploy()
- update_requirements_local()
+ update_requirements()
View
1  sample.properties
@@ -8,6 +8,7 @@ ami.host=127.0.0.1
ami.username=me
ami.password=mysecret
stomp.host=127.0.0.1
+stomp.ws_port=8888
stomp.username=guest
stomp.password=guest
stomp.exchange=ucall222
View
79 utils/ws-rabbit/agent_channel_ws.py
@@ -1,7 +1,7 @@
import tornado.websocket as websocket
import pika
import uuid
-from stompy.frame import Frame
+from stomp import StompFrame
class AgentChannelWebSocket(websocket.WebSocketHandler):
def open(self):
@@ -9,61 +9,41 @@ def open(self):
self.queue_name = str(uuid.uuid1())
- def parse_headers(self, headers_str):
- """Parse headers received from the servers and convert
- to a :class:`dict`.i
-
- :param headers_str: String to parse headers from
-
- """
- # george:constanza\nelaine:benes
- # -> {"george": "constanza", "elaine": "benes"}
-
- headers = {}
- message_raw = headers_str.split("\n")
- for line in message_raw[1:]:
- tokens = line.split(":")
- if len(tokens) == 2 and tokens[0]:
- headers[tokens[0].strip()] = tokens[1].strip()
-
- return headers
-
def on_message(self, message):
- pika.log.info('PikaClient: WebSocket got message, TODO send it to somebody?')
+ pika.log.info('PikaClient: WebSocket got message')
- request = Frame()
+ request = StompFrame()
c = request.parse_command(message)
- h = self.parse_headers(message)
+ h = self.parse_headers(message)
- response = Frame()
if c == 'CONNECT':
- response.build_frame({"command": 'CONNECTED', "headers": {}})
-
- elif c == 'SUBSCRIBE':
- self.agent_id = str(h['destination'])
- pika.log.info(self.agent_id)
- self.application.pika.channel.queue_declare(exclusive=True, queue=self.queue_name, callback=self.on_queue_declared)
+ response = StompFrame.connected()
- response.build_frame({"command": 'OK', "headers": {}})
+ elif c == 'SUBSCRIBE':
+ self.agent_id = str(h['destination'])
+ pika.log.info(self.agent_id)
+ self.application.pika.channel.queue_declare(exclusive=True, queue=self.queue_name, callback=self.on_queue_declared)
+
+ response = StompFrame.ok()
+
+ elif c == 'UNSUBSCRIBE':
+ #TODO same as on_close() but not delete queueu????
+ response = StompFrame.ok()
- elif c == 'UNSUBSCRIBE':
- #TODO same as on_close() but not delete queueu????
- response.build_frame({"command": 'OK', "headers": {}})
-
self.write_message(response.as_string())
def on_close(self):
pika.log.info('PikaClient: WebSocket closed, TODO cancel consumming stuff')
- self.application.pika.channel.queue_unbind(
- callback=self.on_queue_unbound,
- queue=self.queue_name,
- exchange=self.application.pika.exchange_name,
- routing_key=self.agent_id
- )
+ self.application.pika.channel.queue_unbind(
+ callback=self.on_queue_unbound,
+ queue=self.queue_name,
+ exchange=self.application.pika.exchange_name,
+ routing_key=self.agent_id
+ )
def on_queue_unbound(self, method):
- self.application.pika.channel.queue_delete(callback=None, queue=self.queue_name)
+ self.application.pika.channel.queue_delete(callback=None, queue=self.queue_name)
def on_queue_declared(self, frame):
pika.log.info('PikaClient: Queue Declared, Binding Queue')
@@ -82,17 +62,8 @@ def on_queue_bound(self, frame):
def on_pika_message(self, channel, method, header, body):
- pika.log.info('PikaCient: Message receive, delivery tag #%i' % method.delivery_tag)
-
- self.write_stomp_message(body)
+ pika.log.info('PikaCient: Message receive, delivery tag #%i' % method.delivery_tag)
- def write_stomp_message(self, message):
- self.write_message(self.get_stomp_frame('MESSAGE', body=message).as_string())
+ response = StompFrame.message(body)
- def get_stomp_frame(self, command, headers={}, body=None):
- assert command
-
- f = Frame()
- f.build_frame({"command": command, "headers": headers, "body": body})
-
- return f
+ self.write_message(response.as_string())
View
116 utils/ws-rabbit/stomp.py
@@ -0,0 +1,116 @@
+class StompFrame(object):
+
+ @staticmethod
+ def ok():
+ #TODO no such command - use empty message with weird byte
+
+ f = StompFrame()
+ f.build_frame({"command": 'OK'})
+
+ return f
+
+ @staticmethod
+ def connected():
+ f = StompFrame()
+
+ f.build_frame({
+ "command": 'CONNECTED',
+ "headers": {},
+ "body": None
+ })
+
+ return f
+
+ @staticmethod
+ def message(message):
+ # assert command
+ f = StompFrame()
+
+ f.build_frame({
+ "command": 'MESSAGE',
+ "headers": {},
+ "body": message
+ })
+
+ return f
+
+ def parse_headers(self, headers_str):
+ """Parse headers received from the servers and convert
+ to a :class:`dict`.i
+
+ :param headers_str: String to parse headers from
+
+ """
+ # george:constanza\nelaine:benes
+ # -> {"george": "constanza", "elaine": "benes"}
+
+ headers = {}
+ message_raw = headers_str.split("\n")
+ for line in message_raw[1:]:
+ tokens = line.split(":")
+ if len(tokens) == 2 and tokens[0]:
+ headers[tokens[0].strip()] = tokens[1].strip()
+
+ return headers
+
+ def build_frame(self, args, want_receipt=False):
+ """Build a frame based on a :class:`dict` of arguments.
+
+ :param args: A :class:`dict` of arguments for the frame.
+
+ :keyword want_receipt: Optional argument to get a receipt from
+ the sever that the frame was received.
+
+ Example
+
+ >>> frame = frameobj.build_frame({"command": 'CONNECT',
+ "headers": {},
+ want_receipt=True)
+ """
+ self.command = args.get('command')
+ self.headers = args.get('headers')
+ self.body = args.get('body')
+ if want_receipt:
+ receipt_stamp = str(random.randint(0, 10000000))
+ self.headers["receipt"] = "%s-%s" % (
+ self.session.get("session"), receipt_stamp)
+ return self
+
+ def as_string(self):
+ """Raw string representation of this frame
+ Suitable for passing over a socket to the STOMP server.
+
+ Example
+
+ >>> stomp.send(frameobj.as_string())
+
+ """
+ command = self.command
+ headers = self.headers
+ body = self.body
+
+ bytes_message = False
+ if 'bytes_message' in headers:
+ bytes_message = True
+ del headers['bytes_message']
+ headers['content-length'] = len(body)
+ headers['x-client'] = self.my_name
+
+ # Convert and append any existing headers to a string as the
+ # protocol describes.
+ headerparts = ("%s:%s\n" % (key, value)
+ for key, value in headers.iteritems())
+
+ # Frame is Command + Header + EOF marker.
+ frame = "%s\n%s\n%s\x00" % (command, "".join(headerparts), body)
+
+ return frame
+
+ def parse_command(self, command_str):
+ """Parse command received from the server.
+
+ :param command_str: String to parse command from
+
+ """
+ command = command_str.split('\n', 1)[0]
+ return command
View
2  web/simple-demo/connector.html
@@ -9,7 +9,7 @@
<link type="text/css" href='extras/jquery/js/jquery.toastmessage.css' rel="stylesheet" />
<script src='stomp.js'></script>
<script>
- var queue = 'ws://127.0.0.1:8888/agent/';
+ var queue = 'ws://@@stomp.host@@:@@stomp.ws_port@@/agent/';
</script>
<script src='connector1.js'></script>
</head>

0 comments on commit ffaec59

Please sign in to comment.
Something went wrong with that request. Please try again.