Skip to content

Commit

Permalink
正常连接淘宝websocket服务器,并且能拉取到数据
Browse files Browse the repository at this point in the history
  • Loading branch information
period331 committed Aug 22, 2014
1 parent 13c75fd commit 4aafb5b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 27 deletions.
2 changes: 1 addition & 1 deletion taobaowspy/event.py
Expand Up @@ -25,5 +25,5 @@ def off(self, name, callback):

def fire(self, name, *args, **kwargs):
for ev in self.__listeners[name]:
ev(*args, kwargs)
ev(*args, **kwargs)

40 changes: 22 additions & 18 deletions taobaowspy/message.py
Expand Up @@ -4,13 +4,13 @@
from datetime import datetime
import types

from taobaowspy.messagetype import MessageType
from messagetype import MessageType


class Reader(object):
class _Reader(object):

@staticmethod
def read(stream):
@classmethod
def read(cls, stream):
""" 读取消息数据 """

def unpack_from_wrap(fmt, offset):
Expand All @@ -27,21 +27,21 @@ def unpack_from_wrap(fmt, offset):

while header_type != _header_type.endOfHeaders:
if header_type == _header_type.custom:
key, message.offset = self._read_counted_str(stream, message.offset)
key, message.offset = cls._read_counted_str(stream, message.offset)

value, message.offset = self._read_custom_value(stream, message.offset)
value, message.offset = cls._read_custom_value(stream, message.offset)

message.content[key] = value
elif header_type == _header_type.statusCode:
message.status_code = unpack_from_wrap('I', message.offset)[0]
message.update_offset(calcsize('<I'))
elif header_type == _header_type.statusPhrase:
message.status_phrase, message.offset = self._read_counted_str(stream, message.offset)
message.status_phrase, message.offset = cls._read_counted_str(stream, message.offset)
elif header_type == _header_type.flag:
message.flag = unpack_from_wrap('I', message.offset)[0]
message.update_offset(calcsize('<I'))
elif header_type == _header_type.token:
message.token, message.offset = self._read_counted_str(stream, message.offset)
message.token, message.offset = cls._read_counted_str(stream, message.offset)

header_type = unpack_from_wrap('H', message.offset)[0]
message.update_offset(calcsize('<H'))
Expand All @@ -61,8 +61,8 @@ def _read_counted_str(stream, offset):
else:
return None, offset + calcsize('<I')

@staticmethod
def _read_custom_value(stream, offset):
@classmethod
def _read_custom_value(cls, stream, offset):
""" 读取用户数据value """
_type = unpack_from('<B', stream, offset)[0]

Expand All @@ -87,13 +87,15 @@ def _read_custom_value(stream, offset):
_l = unpack_from('<I', stream, offset)[0]
return unpack_from('<%dB' % _l, stream, offset + calcsize('<I'))[0], offset + calcsize('<I%dB' % _l)
else:
return self._read_counted_str(stream, offset)
return cls._read_counted_str(stream, offset)

reader = lambda stream: _Reader.read(stream)

class Writer(object):

@staticmethod
def write(message):
class _Writer(object):

@classmethod
def write(cls, message):

stream = WriteBuffer()

Expand All @@ -118,17 +120,17 @@ def write(message):

if len(message.content) > 0:
for key, value in message.content.items():
self._write_custom_header(stream, key, value)
cls._write_custom_header(stream, key, value)

stream.int16(MessageType.HeaderType.endOfHeaders)

return stream

@staticmethod
def _write_custom_header(stream, key, value):
@classmethod
def _write_custom_header(cls, stream, key, value):
stream.int16(MessageType.HeaderType.custom)
stream.string(key)
self._write_custom_value(stream, value)
cls._write_custom_value(stream, value)

@staticmethod
def _write_custom_value(stream, value):
Expand All @@ -151,6 +153,8 @@ def _write_custom_value(stream, value):
stream.byte(MessageType.ValueFormat.countedString)
stream.string(value)

writer = lambda message: bytes(_Writer.write(message))


class WriteBuffer(bytearray):
def byte(self, v):
Expand Down
58 changes: 50 additions & 8 deletions taobaowspy/tmcclient.py
Expand Up @@ -6,38 +6,80 @@

from event import Event
from tornadowebsocket import WebSocket
from message import reader, writer, Message
from tornado import ioloop, iostream

logger = logging.getLogger(__name__)


class TmcClient(WebSocket, Event):
def __init__(self, url, app_key, app_secret):
def __init__(self, url, app_key, app_secret, group_name='default', *args, **kwargs):
super(TmcClient, self).__init__(url, *args, **kwargs)

assert isinstance(url, (str, unicode)) and len(url) > 0
assert isinstance(app_key, (str, unicode)) and len(app_key) > 0
assert isinstance(app_secret, (str, unicode)) and len(app_secret) > 0
assert isinstance(group_name, (str, unicode)) and len(group_name) > 0

self.url = url
self.app_secret = app_secret
self.app_key = app_key
self.group_name = group_name

def connect(self):
def create_sign(self, timestamp):
timestamp = timestamp if timestamp else int(round(time.time() * 1000))
params = {
'group_name': self.group_name,
'app_key': self.app_key,
'timestamp': timestamp,
}

self.stream.connect((self.host, self.port), self._on_connect)def
keys = params.keys()
keys.sort()

params = "%s%s%s" % (self.app_secret, str().join('%s%s' % (key, params[key]) for key in keys), self.app_secret)
return md5(params).hexdigest().upper()

def on_open(self):
pass
timestamp = int(round(time.time() * 1000))

params = {
'timestamp': str(timestamp),
'app_key': self.app_key,
'sdk': 'top-sdk-java-201403304',
'sign': self.create_sign(timestamp),
'group_name': self.group_name,
}

message = writer(Message(2, 0, flag=1, content=params))

self.write_binary(message)

def write_binary(self, message):
self.write_message(message, True)

def on_message(self, data):
pass
message = reader(data)
print message

def on_ping(self):
pass
print 'on_ping'

def on_pong(self):
pass
print 'on_pong'

def on_close(self):
pass
print 'on_close'

def on_unsupported(self):
print 'on_unsupported'


if __name__ == '__main__':
ws = TmcClient('ws://mc.api.tbsandbox.com/', '1021737885', 'sandboxbbf5579605d7936422c11af0e', 'default')
try:
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass
finally:
ws.close()

0 comments on commit 4aafb5b

Please sign in to comment.