Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

broadcast shares in serial

  • Loading branch information...
commit 6f1a456b21db79b06cd6d3edd1904dd3c597b981 1 parent 3116643
@forrestv authored
View
5 p2pool/main.py
@@ -391,6 +391,7 @@ def _(header):
for peer in p2p_node.peers.itervalues():
peer.send_bestblock(header=header)
+ @defer.inlineCallbacks
def broadcast_share(share_hash):
shares = []
for share in tracker.get_chain(share_hash, min(5, tracker.get_height(share_hash))):
@@ -399,8 +400,8 @@ def broadcast_share(share_hash):
shared_share_hashes.add(share.hash)
shares.append(share)
- for peer in p2p_node.peers.itervalues():
- peer.sendShares([share for share in shares if share.peer is not peer])
+ for peer in list(p2p_node.peers.itervalues()):
+ yield peer.sendShares([share for share in shares if share.peer is not peer])
# send share when the chain changes to their chain
best_share_var.changed.watch(broadcast_share)
View
10 p2pool/p2p.py
@@ -25,6 +25,8 @@ def __init__(self, node, incoming):
self.connected2 = False
def connectionMade(self):
+ p2protocol.Protocol.connectionMade(self)
+
self.factory.proto_made_connection(self)
self.addr = self.transport.getPeer().host, self.transport.getPeer().port
@@ -205,12 +207,14 @@ def handle_shares(self, shares):
def sendShares(self, shares):
def att(f, **kwargs):
try:
- f(**kwargs)
+ return f(**kwargs)
except p2protocol.TooLong:
att(f, **dict((k, v[:len(v)//2]) for k, v in kwargs.iteritems()))
- att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
+ return att(f, **dict((k, v[len(v)//2:]) for k, v in kwargs.iteritems()))
if shares:
- att(self.send_shares, shares=[share.as_share() for share in shares])
+ return att(self.send_shares, shares=[share.as_share() for share in shares])
+ else:
+ return defer.succeed(None)
message_sharereq = pack.ComposedType([
View
16 p2pool/util/p2protocol.py
@@ -9,7 +9,7 @@
from twisted.python import log
import p2pool
-from p2pool.util import datachunker
+from p2pool.util import datachunker, variable
class TooLong(Exception):
pass
@@ -19,6 +19,19 @@ def __init__(self, message_prefix, max_payload_length):
self._message_prefix = message_prefix
self._max_payload_length = max_payload_length
self.dataReceived = datachunker.DataChunker(self.dataReceiver())
+ self.paused_var = variable.Variable(False)
+
+ def connectionMade(self):
+ self.transport.registerProducer(self, True)
+
+ def pauseProducing(self):
+ self.paused_var.set(True)
+
+ def resumeProducing(self):
+ self.paused_var.set(False)
+
+ def stopProducing(self):
+ pass
def dataReceiver(self):
while True:
@@ -74,6 +87,7 @@ def sendPacket(self, command, payload2):
if len(payload) > self._max_payload_length:
raise TooLong('payload too long')
self.transport.write(self._message_prefix + struct.pack('<12sI', command, len(payload)) + hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4] + payload)
+ return self.paused_var.get_when_satisfies(lambda paused: not paused)
def __getattr__(self, attr):
prefix = 'send_'
View
14 p2pool/util/variable.py
@@ -66,10 +66,12 @@ def set(self, value):
self.changed.happened(value)
self.transitioned.happened(oldvalue, value)
+ @defer.inlineCallbacks
+ def get_when_satisfies(self, func):
+ while True:
+ if func(self.value):
+ defer.returnValue(self.value)
+ yield self.changed.once.get_deferred()
+
def get_not_none(self):
- if self.value is not None:
- return defer.succeed(self.value)
- else:
- df = defer.Deferred()
- self.changed.once.watch(df.callback)
- return df
+ return self.get_when_satisfies(lambda val: val is not None)
Please sign in to comment.
Something went wrong with that request. Please try again.