/
p2protocol.py
105 lines (85 loc) · 3.85 KB
/
p2protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
'''
Generic message-based protocol used by Bitcoin and P2Pool for P2P communication
'''
import hashlib
import struct
from twisted.internet import protocol
from twisted.python import log
import p2pool
from p2pool.util import datachunker, variable
class TooLong(Exception):
pass
class Protocol(protocol.Protocol):
def __init__(self, message_prefix, max_payload_length, traffic_happened=variable.Event()):
self._message_prefix = message_prefix
self._max_payload_length = max_payload_length
self.dataReceived2 = datachunker.DataChunker(self.dataReceiver())
self.paused_var = variable.Variable(False)
self.traffic_happened = traffic_happened
def connectionMade(self):
self.transport.registerProducer(self, True)
def pauseProducing(self):
self.paused_var.set(True)
def resumeProducing(self):
self.paused_var.set(False)
def stopProducing(self):
pass
def dataReceived(self, data):
self.traffic_happened.happened('p2p/in', len(data))
self.dataReceived2(data)
def dataReceiver(self):
while True:
start = ''
while start != self._message_prefix:
start = (start + (yield 1))[-len(self._message_prefix):]
command = (yield 12).rstrip('\0')
length, = struct.unpack('<I', (yield 4))
if length > self._max_payload_length:
print 'length too large'
continue
checksum = yield 4
payload = yield length
if hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] != checksum:
print 'invalid hash for', self.transport.getPeer().host, repr(command), length, checksum.encode('hex'), hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4].encode('hex'), payload.encode('hex')
continue
type_ = getattr(self, 'message_' + command, None)
if type_ is None:
if p2pool.DEBUG:
print 'no type for', repr(command)
continue
try:
self.packetReceived(command, type_.unpack(payload))
except:
print 'RECV', command, payload[:100].encode('hex') + ('...' if len(payload) > 100 else '')
log.err(None, 'Error handling message: (see RECV line)')
self.transport.loseConnection()
def packetReceived(self, command, payload2):
handler = getattr(self, 'handle_' + command, None)
if handler is None:
if p2pool.DEBUG:
print 'no handler for', repr(command)
return
handler(**payload2)
def badPeerHappened(self):
self.transport.loseConnection()
def sendPacket(self, command, payload2):
if len(command) >= 12:
raise ValueError('command too long')
type_ = getattr(self, 'message_' + command, None)
if type_ is None:
raise ValueError('invalid command')
#print 'SEND', command, repr(payload2)[:500]
payload = type_.pack(payload2)
if len(payload) > self._max_payload_length:
raise TooLong('payload too long')
data = self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload
self.traffic_happened.happened('p2p/out', len(data))
self.transport.write(data)
return self.paused_var.get_when_satisfies(lambda paused: not paused)
def __getattr__(self, attr):
prefix = 'send_'
if attr.startswith(prefix):
command = attr[len(prefix):]
return lambda **payload2: self.sendPacket(command, payload2)
#return protocol.Protocol.__getattr__(self, attr)
raise AttributeError(attr)