Skip to content

Commit

Permalink
many fixes of closing frames, updated client to actually work with la…
Browse files Browse the repository at this point in the history
…test spec
  • Loading branch information
Lawouach committed Jan 3, 2012
1 parent 821639b commit 37788d7
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 57 deletions.
2 changes: 1 addition & 1 deletion example/echo_cherrypy_server.py
Expand Up @@ -10,7 +10,7 @@

class ChatWebSocketHandler(WebSocketHandler):
def received_message(self, m):
cherrypy.engine.publish('websocket-broadcast', str(m))
cherrypy.engine.publish('websocket-broadcast', m)

class Root(object):
def __init__(self, host, port):
Expand Down
10 changes: 7 additions & 3 deletions example/echo_client.py
Expand Up @@ -8,14 +8,18 @@ def data_provider():
yield "#" * i

self.send(data_provider())

for i in range(0, 200, 25):
print i
self.send("*" * i)

def closed(self, code, reason):
print "Closed down", code, reason

def received_message(self, m):
print m, len(str(m))
print "=> %d %s" % (len(m), str(m))
if len(str(m)) == 175:
self.close()
self.close(reason='Bye bye')

if __name__ == '__main__':
try:
Expand Down
36 changes: 18 additions & 18 deletions ws4py/client/__init__.py
Expand Up @@ -13,7 +13,7 @@
__all__ = ['WebSocketBaseClient']

class WebSocketBaseClient(object):
def __init__(self, url, protocols=None, version='8'):
def __init__(self, url, protocols=None, version='13'):
self.stream = Stream()
self.url = url
self.protocols = protocols
Expand Down Expand Up @@ -106,7 +106,7 @@ def terminated(self):
def close(self, reason='', code=1000):
if not self.client_terminated:
self.client_terminated = True
self.write_to_connection(self.stream.close(code=code, reason=reason))
self.write_to_connection(self.stream.close(code=code, reason=reason).single(mask=True))

def connect(self):
raise NotImplemented()
Expand All @@ -121,26 +121,26 @@ def close_connection(self):
raise NotImplemented()

def send(self, payload, binary=False):
message_sender = self.stream.binary_message if binary else self.stream.text_message

if isinstance(payload, basestring):
if not binary:
self.write_to_connection(self.stream.text_message(payload).single(mask=True))
else:
self.write_to_connection(self.stream.binary_message(payload).single(mask=True))
self.write_to_connection(message_sender(payload).single(mask=True))

elif isinstance(payload, dict):
self.write_to_connection(self.stream.text_message(json.dumps(payload)).single(mask=True))
self.write_to_connection(message_sender(json.dumps(payload)).single(mask=True))

elif type(payload) == types.GeneratorType:
bytes = payload.next()
first = True
for chunk in payload:
if not binary:
self.write_to_connection(self.stream.text_message(bytes).fragment(first=first, mask=True))
else:
self.write_to_connection(self.stream.binary_message(bytes).fragment(first=first, mask=True))
bytes = chunk
last = False
bytes = payload.next()

while not last:
try:
peeked_bytes = payload.next()
except StopIteration:
last = True

self.write_to_connection(message_sender(bytes).fragment(first=first, last=last, mask=True))
first = False
if not binary:
self.write_to_connection(self.stream.text_message(bytes).fragment(last=True, mask=True))
else:
self.write_to_connection(self.stream.binary_message(bytes).fragment(last=True, mask=True))
bytes = peeked_bytes

10 changes: 5 additions & 5 deletions ws4py/client/threadedclient.py
Expand Up @@ -13,7 +13,7 @@
__all__ = ['WebSocketClient']

class WebSocketClient(WebSocketBaseClient):
def __init__(self, url, sock=None, protocols=None, version='8'):
def __init__(self, url, sock=None, protocols=None, version='13'):
WebSocketBaseClient.__init__(self, url, protocols=protocols, version=version)
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
Expand Down Expand Up @@ -91,11 +91,10 @@ def _receive(self):
with self._lock:
s = self.stream
next_size = s.parser.send(bytes)

if s.closing is not None:
if not self.client_terminated:
next_size = 2
self.close()
else:
self.server_terminated = True
self.running = False
Expand Down Expand Up @@ -124,12 +123,13 @@ def _receive(self):
except:
print "".join(traceback.format_exception(*exc_info()))
finally:
self.close_connection()
pass

if self.stream.closing:
self.closed(self.stream.closing.code, self.stream.closing.reason)
else:
self.closed(1006)

if __name__ == '__main__':
import time

Expand Down
11 changes: 9 additions & 2 deletions ws4py/messaging.py
Expand Up @@ -52,8 +52,9 @@ def fragment(self, first=False, last=False, mask=False):
fin = 1 if last is True else 0
opcode = self.opcode if first is True else OPCODE_CONTINUATION
mask = os.urandom(4) if mask else None
return Frame(body=self.data or '', opcode=opcode,
masking_key=mask, fin=fin).build()
return Frame(body=self.data or '',
opcode=opcode, masking_key=mask,
fin=fin).build()

@property
def completed(self):
Expand Down Expand Up @@ -126,6 +127,12 @@ def __init__(self, code=1000, reason=''):
self.code = code
self.reason = reason

def __str__(self):
return self.reason

def __unicode__(self):
return self.reason.decode('utf-8')

class PingControlMessage(Message):
def __init__(self, data=None):
Message.__init__(self, OPCODE_PING, data)
Expand Down
4 changes: 2 additions & 2 deletions ws4py/server/geventserver.py
Expand Up @@ -87,5 +87,5 @@ def echo_handler(websocket, environ):
finally:
websocket.close()

server = WebSocketServer(('127.0.0.1', 9000), echo_handler)
server.serve_forever()
server = WebSocketServer(('127.0.0.1', 9001), echo_handler)
server.serve_forever()
35 changes: 15 additions & 20 deletions ws4py/server/handler/threadedhandler.py
Expand Up @@ -9,6 +9,7 @@
import types

from ws4py.streaming import Stream
from ws4py.messaging import Message

__all__ = ['WebSocketHandler', 'EchoWebSocketHandler']

Expand Down Expand Up @@ -61,7 +62,7 @@ def close(self, code=1000, reason=''):
"""
if not self.server_terminated:
self.server_terminated = True
self.write_to_connection(self.stream.close(code=code, reason=reason))
self.write_to_connection(self.stream.close(code=code, reason=reason).single())

def closed(self, code, reason=None):
"""
Expand All @@ -71,7 +72,7 @@ def closed(self, code, reason=None):
@param code: status code
@param reason: human readable message of the closing exchange
"""
pass
self.write_to_connection

@property
def terminated(self):
Expand Down Expand Up @@ -136,26 +137,23 @@ def send(self, payload, binary=False):
@param payload: string, bytes, bytearray or a generator
@param binary: if set, handles the payload as a binary message
"""
message_sender = self.stream.binary_message if binary else self.stream.text_message

if isinstance(payload, basestring) or isinstance(payload, bytearray):
if not binary:
self.write_to_connection(self.stream.text_message(payload).single())
else:
self.write_to_connection(self.stream.binary_message(payload).single())
self.write_to_connection(message_sender(payload).single())

elif isinstance(payload, Message):
self.write_to_connection(payload.single())

elif type(payload) == types.GeneratorType:
bytes = payload.next()
first = True
for chunk in payload:
if not binary:
self.write_to_connection(self.stream.text_message(bytes).fragment(first=first))
else:
self.write_to_connection(self.stream.binary_message(payload).fragment(first=first))
self.write_to_connection(message_sender(bytes).fragment(first=first))
bytes = chunk
first = False
if not binary:
self.write_to_connection(self.stream.text_message(bytes).fragment(last=True))
else:
self.write_to_connection(self.stream.text_message(bytes).fragment(last=True))

self.write_to_connection(message_sender(bytes).fragment(last=True))

def _receive(self):
"""
Expand Down Expand Up @@ -221,11 +219,8 @@ def _receive(self):
self.client_terminated = self.server_terminated = True

try:
if not self.server_terminated:
if self.stream.closing:
self.closed(self.stream.closing.code, self.stream.closing.reason)
else:
self.closed(1006)
if not self.stream.closing:
self.closed(1006)
finally:
self.close_connection()

Expand All @@ -235,5 +230,5 @@ class EchoWebSocketHandler(WebSocketHandler):
it receives.
"""
def received_message(self, m):
self.send(m.data, m.is_binary)
self.send(m, m.is_binary)

14 changes: 8 additions & 6 deletions ws4py/streaming.py
Expand Up @@ -113,7 +113,7 @@ def close(self, code=1000, reason=''):
@param reason: status message
@return: bytes representing a close control single framed message
"""
return CloseControlMessage(code=code, reason=reason).single()
return CloseControlMessage(code=code, reason=reason)

def ping(self, data=''):
"""
Expand Down Expand Up @@ -156,7 +156,6 @@ def receiver(self):
the data provider.
"""
utf8validator = Utf8Validator()

running = True
while running:
frame = Frame()
Expand All @@ -169,8 +168,10 @@ def receiver(self):
frame.parser.send(bytes)
except StopIteration:
bytes = frame.body or ''
if frame.masking_key and bytes:
if frame.masking_key:
bytes = frame.unmask(bytes)
else:
bytes = bytearray(frame.body)

if frame.opcode == OPCODE_TEXT:
if self.message and not self.message.completed:
Expand Down Expand Up @@ -232,7 +233,8 @@ def receiver(self):
else:
if len(bytes) > 2:
try:
reason = frame.body[2:].decode("utf-8")
msg = bytes[2:] if frame.masking_key else frame.body[2:]
reason = msg.decode("utf-8")
except UnicodeDecodeError:
code = 1007
reason = ''
Expand Down Expand Up @@ -265,6 +267,6 @@ def receiver(self):
break

frame.parser.close()

utf8validator.reset()
utf8validator.reset()
utf8validator = None

0 comments on commit 37788d7

Please sign in to comment.