Skip to content

Commit

Permalink
Switched out the low-level messaging protocol to a specialized class.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnIrishDuck committed May 8, 2011
1 parent 2104995 commit a88d662
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 66 deletions.
19 changes: 12 additions & 7 deletions twistedfcp/error.py
Expand Up @@ -8,6 +8,11 @@
class FCPException(Failure, Exception):
"Base exception for all FCP errors."

class MalformedMessageException(FCPException):
"Indicates that the Freenet node sent a malformed message to the client."

class MessageException(FCPException):
"Base class for all exceptions that are raised by a specific FCP message."
def __init__(self, error_msg):
self.code = int(error_msg['Code'])
self.msg = error_msg['CodeDescription']
Expand All @@ -18,32 +23,32 @@ def __repr__(self): return "{0}: {1}".format(type(self), self.msg)

def __str__(self): return self.__repr__()

class FetchException(FCPException):
class FetchException(MessageException):
"An exception indicating a prior fetch operation has failed."
message = "GetFailed"

class PutException(FCPException):
class PutException(MessageException):
"An exception indicating a prior put operation has failed."
message = "PutFailed"

class ProtocolException(FCPException):
class ProtocolException(MessageException):
"An exception indicating a problem with the Freenet protocol."
message = "ProtocolError"

class IdentifierException(FCPException):
class IdentifierException(MessageException):
"""
An exception indicating that the client is attempting to reuse an
Identifier.
"""
message = "IdentifierCollision"

class UnknownNodeException(FCPException):
class UnknownNodeException(MessageException):
"An exception indicating that the specified node is not known."
message = "UnknownNodeIdentifier"

class UnknownPeerNoteException(FCPException):
class UnknownPeerNoteException(MessageException):
"An exception indicating that the specified peer note type is not known."
message = "UnknonwPeerNoteType"

error_dict = dict((klass.message, klass)
for klass in FCPException.__subclasses__())
for klass in MessageException.__subclasses__())
82 changes: 23 additions & 59 deletions twistedfcp/protocol.py
Expand Up @@ -14,8 +14,9 @@
from twisted.internet.defer import Deferred
from message import Message, IdentifiedMessage, ClientHello
from error import Failure, error_dict
from util import MessageBasedProtocol

class FreenetClientProtocol(protocol.Protocol):
class FreenetClientProtocol(MessageBasedProtocol):
"""
Defines a twisted implementation of the Freenet Client Protocol. There are
several important things to note about the internals of this class:
Expand All @@ -40,45 +41,17 @@ class FreenetClientProtocol(protocol.Protocol):
port = 9481

def __init__(self):
MessageBasedProtocol.__init__(self)
self.deferred = defaultdict(Deferred)
self.sessions = defaultdict(Deferred)

def connectionMade(self):
"On connection, sends a FCP ClientHello message."
self.sendMessage(ClientHello)

def dataReceived(self, data):
"Parses in received packets until none remain."
partial = data
while partial:
partial = self._parseOne(partial)

def _parseOne(self, data):
"""
Parses a single packet from ``data`` and handles it. Returns the remnant
of the ``data`` after the first message has been removed.
"""
safe_index = lambda x: data.index(x) if x in data else len(data)
endHeader = min(safe_index(p) for p in ['\nData\n', '\nEndMessage\n'])
header, data = data[:endHeader], data[endHeader:]

header = header.split('\n')
messageType = header[0]
message = dict(line.split('=') for line in header[1:])
if data.startswith('\nData\n'):
l = int(message['DataLength'])
data = data[len('\nData\n'):]
message['Data'], data = data[:l], data[l:]
else:
data = data[len('\nEndMessage\n'):]
logging.info("Received {0}".format(messageType))
logging.debug(message)
self._process(Message(messageType, message.items()))
return data

def _process(self, message):
def message_received(self, messageType, messageItems):
"Processes the received message, firing the necessary deferreds."
message = Message(messageType, messageItems.items())
if message.name in self.deferred:
deferred = self.deferred[message.name]
del self.deferred[message.name]
Expand All @@ -90,33 +63,6 @@ def _process(self, message):
del self.sessions[session_id]
result = deferred.callback(message)

def sendMessage(self, message, data=None):
"""
Sends a single ``message`` to the server. If ``data`` is specified, it
gets tacked on to the end of the message and a ``DataLength`` field is
added to the message arguments.
"""
self.transport.write(message.name)
self.transport.write('\n')
for key, value in message.args:
self.transport.write(str(key))
self.transport.write('=')
self.transport.write(str(value))
self.transport.write('\n')

if not data:
self.transport.write('EndMessage\n')
logging.info("Sent {0}".format(message.name))
logging.debug(message.args)
else:
self.transport.write('DataLength={0}\n'.format(len(data)))
self.transport.write('Data\n')
self.transport.write(data)
logging.info("Sent {0} (data length={1})".format(message.name,
len(data)))
logging.debug(message.args)

def do_session(self, msg, handler, data=None):
"""
Wraps the given message processing function ``f`` in session handling
Expand Down Expand Up @@ -189,6 +135,24 @@ def process(message):

return self.do_session(gen, process)

def get_all_peers(self):
list_msg = Message("ListPeers", [])
peers = []
done = Deferred()

def node_info(message):
peers.append(message.args)
self.deferred["NodeData"].addCallback(node_info)

def end_list(message):
del self.deferred["NodeData"]
done.callback(peers)

self.deferred["NodeData"].addCallback(node_info)
self.deferred["EndListPeers"].addCallback(end_list)
self.sendMessage(list_msg)
return done

class FCPFactory(protocol.Factory):
"A protocol factory that uses FCP."
protocol = FreenetClientProtocol
Expand Down
122 changes: 122 additions & 0 deletions twistedfcp/util.py
Expand Up @@ -2,6 +2,8 @@
Defines utility functions for the ``twistedfcp`` module.
"""
import logging
from twisted.protocols.basic import LineReceiver

def sequence(f):
"""
Expand Down Expand Up @@ -31,3 +33,123 @@ def er(msg):
return continue_defer(gen.next())
return _inner

class MessageBasedProtocol(LineReceiver):
"""
Defines a protocol that parses freenet-style messages. These messages take
the following form::
NodeHello
FCPVersion=2.0
ConnectionIdentifier=754595fc35701d76096d8279d15c57e6
Version=Fred,0.7,1.0,1231
Node=Fred
NodeLanguage=ENGLISH
ExtRevision=23771
Build=1231
Testnet=false
ExtBuild=26
CompressionCodecs=3 - GZIP(0), BZIP2(1), LZMA(2)
Revision=@custom@
EndMessage
Note that it is possible for messages to end with a ``Data`` line instead
of an ``EndMessage`` line. These messages will be followed by a binary data
string whose length was specified previously by a ``DataLength`` field.
Example::
AllData
Identifier=Request Number One
DataLength=37261 // length of data
StartupTime=1189683889
CompletionTime=1189683889
Metadata.ContentType=text/plain;charset=utf-8
Data
<37261 bytes of data>
"""
def __init__(self):
self.delimiter = "\n"
self.reset()

def reset(self):
"Resets this protocol to its original state (waiting for a new message)"
self.dataReceived = self.dataReceived
self.lineReceived = self.new_message
self.messageName = None
self.message = {}

def new_message(self, line):
"In this state, the protocol treats the line as the message name."
self.messageName = line
self.lineReceived = self.key_value

def key_value(self, line):
"In this state, the protocol treats the line as a ``key=value`` pair."
if line == "EndMessage":
self.end_message()
elif line == "Data":
if 'DataLength' not in self.message:
text = ('Encountered a "Data" ending in a message without '
'a "DataLength" key')
raise MalformedMessageException(text)
else:
self.message['Data'] = []
self.dataCount = 0
self.setRawMode()
else:
kv = line.split('=')
if len(kv) != 2:
text = 'Bad line encountered: "{0}" (expected "key=value")'
raise MalformedMessageException(text.format(line))
self.message.update([kv])

def rawDataReceived(self, data):
"""
In this state we parse the final data in the message.
.. note::
This state can only be reached when this instance is in "raw mode".
"""
self.dataCount += len(data)
self.message["Data"].append(data)
expected = int(self.message["DataLength"])
if self.dataCount >= expected:
all_data = ''.join(self.message["Data"])
self.message["Data"] = all_data[:expected]
self.end_message()
self.setLineMode(all_data[expected:])

def end_message(self):
"Process a fully received message and resets state."
logging.info("Received {0}.".format(self.messageName))
logging.debug(str(self.message))
self.message_received(self.messageName, self.message)
self.reset()

def sendMessage(self, message, data=None):
"""
Sends a single ``message`` to the server. If ``data`` is specified, it
gets tacked on to the end of the message and a ``DataLength`` field is
added to the message arguments.
"""
self.transport.write(message.name)
self.transport.write('\n')
for key, value in message.args:
self.transport.write(str(key))
self.transport.write('=')
self.transport.write(str(value))
self.transport.write('\n')

if not data:
self.transport.write('EndMessage\n')
logging.info("Sent {0}".format(message.name))
logging.debug(message.args)
else:
self.transport.write('DataLength={0}\n'.format(len(data)))
self.transport.write('Data\n')
self.transport.write(data)
logging.info("Sent {0} (data length={1})".format(message.name,
len(data)))
logging.debug(message.args)

0 comments on commit a88d662

Please sign in to comment.