Permalink
Browse files

more p2p

git-svn-id: svn://forre.st/undns@1208 470744a7-cac9-478e-843e-5ec1b25c69e8
  • Loading branch information...
1 parent 3fe4d08 commit c691f7de8de63b02e1561fac4a5c0721dfdef9c8 forrest committed Mar 22, 2011
Showing with 148 additions and 84 deletions.
  1. +148 −84 p2p.py
View
232 p2p.py
@@ -11,9 +11,9 @@
from twisted.python import failure
import itertools
-import pygame
+#import pygame
d = None
-d = pygame.display.set_mode((512, 512))
+#d = pygame.display.set_mode((512, 512))
def get_coords(o):
a = hash(o)
a, x = divmod(a, 512)
@@ -35,12 +35,16 @@ def draw():
draw()
def do_work(x, difficulty, stop_flag):
+ d = "[%s, " % json.dumps(x)
+ h = hashlib.sha1(d)
for i in itertools.count(random.randrange(2**63)):
if stop_flag[0]:
return None
- d = json.dumps((x, i))
- if int(hashlib.sha1(d).hexdigest(), 16) % difficulty == 0:
- return d
+ d2 = "%i]" % i
+ h2 = h.copy()
+ h2.update(d2)
+ if int(h2.hexdigest(), 16) % difficulty == 0:
+ return d + d2
def insort(a, x, lo=0, hi=None, key=lambda x: x):
if lo < 0:
@@ -58,8 +62,15 @@ def insort(a, x, lo=0, hi=None, key=lambda x: x):
DEFAULT_TIMEOUT = 10#s
+def count(i):
+ r = {}
+ for item in i:
+ r[item] = r.get(item, 0) + 1
+ return r
+
class RemoteNode(object):
def __init__(self, protocol, address, id):
+ assert isinstance(address, tuple), address
self.protocol = protocol
self.address = address
self.id = id
@@ -79,6 +90,10 @@ def do_rpc(*args, **kwargs):
tag = random.randrange(2**160)
d = defer.Deferred()
def timeout_func():
+ if self in self.protocol.peers:
+ self.protocol.peers.remove(self)
+ self.protocol.bad_peers.add(self)
+ print "query timed out"
d, t = self.protocol.queries.pop(tag)
d.errback(defer.TimeoutError())
t = reactor.callLater(timeout, timeout_func)
@@ -94,7 +109,7 @@ def timeout_func():
class RemoteError(Exception):
pass
-GENESIS_DIFFICULTY = 10
+GENESIS_DIFFICULTY = 400000
class Block(object):
@classmethod
@@ -116,8 +131,6 @@ def __init__(self, data):
self.hash = int(hashlib.sha1(data).hexdigest(), 16)
(self.previous_hash, self.pos, self.message, self.difficulty), nonce = json.loads(data)
-tags = set()
-
class Node(protocol.DatagramProtocol):
# characteristics
@@ -133,6 +146,7 @@ def distance_to_id(self, other_id):
def __init__(self, port, bootstrap_addresses):
#protocol.DatagramProtocol.__init__(self)
self.peers = []
+ self.bad_peers = set()
self.contacts = {} # (address, id) -> RemoteNode()
self.queries = {}
self.port = port
@@ -154,7 +168,7 @@ def startProtocol(self):
# utility functions
def say(self, *x):
- print " " * (self.port%200), self.port, ' '.join(map(str,x))
+ print " " * (self.port%120), self.port, ' '.join(map(str,x))
def think(self):
if self.bootstrap_addresses:
@@ -170,13 +184,14 @@ def try_to_do_something(self):
if previous_block is None:
previous_hash = None
pos = 0
- message = [self.port]
+ message = {self.port: 1}
difficulty = GENESIS_DIFFICULTY
else:
previous_hash = previous_block.hash
pos = previous_block.pos + 1
- message = previous_block.message + [self.port]
- difficulty = previous_block.difficulty + (previous_block.difficulty + 999) // 1000
+ message = dict((int(k), int(v)) for k, v in previous_block.message.iteritems())
+ message[self.port] = message.get(self.port, 0) + 1
+ difficulty = previous_block.difficulty + 1 # (previous_block.difficulty + 999) // 1000
d = Block.generate(previous_hash, pos, message, difficulty)
def abort(d=d):
@@ -187,26 +202,28 @@ def abort(d=d):
try:
result = yield d
except defer.CancelledError:
+ self.say("cancelled")
continue # we aborted because of a new longest chain
- self.say("generated", result.message)
+ self.say("generated", result.pos, result.message)
self.received_block(result, self)
- d2 = defer.Deferred()
- reactor.callLater(random.expovariate(1/3), d2.callback, "pineapple")
- yield d2
- del d2
+ #d2 = defer.Deferred()
+ #reactor.callLater(random.expovariate(1/3), d2.callback, "pineapple")
+ #yield d2
+ #del d2
- def received_block(self, block, from_node=None):
+ def received_block(self, block, from_node=None, depth=0):
if block.hash in self.verified:
return "already verified"
if block.hash % block.difficulty != 0:
return "invalid nonce"
- if self.best_block is not None and block.pos < self.best_block.pos - 16:
- return "you lose"
+ # this needs to change ... it should compare against all blocks, not the best verified block
+ #if self.best_block is not None and block.pos < self.best_block.pos - 16:
+ # return "you lose"
if block.pos == 0:
if block.previous_hash is not None:
@@ -216,47 +233,70 @@ def received_block(self, block, from_node=None):
return "genesis difficulty"
self.blocks[block.hash] = block
- self.verified_block(block, from_node)
+ self.referrers.setdefault(block.previous_hash, set()).add(block)
+ self.say("g_received", block.pos, block.message)
+ self.verified_block(block, from_node, depth=depth + 1)
+ elif block.previous_hash not in self.verified:
+ self.blocks[block.hash] = block
+ self.referrers.setdefault(block.previous_hash, set()).add(block)
+ self.say("h_received", block.pos, block.message)
+
+ b = block
+ while True:
+ assert b.previous_hash is not None
+ if b.previous_hash not in self.blocks:
+ if from_node is None:
+ from_node = random.choice(self.peers)
+ def got_block(datas):
+ print datas
+ self.blocks.pop(b.previous_hash)
+ for data in reversed(datas):
+ block2 = Block(data)
+ self.received_block(block2)
+ def got_error(fail):
+ self.blocks.pop(b.previous_hash)
+ print fail
+ self.blocks[b.previous_hash] = None
+ print "requesting block before", b.pos
+ from_node.rpc_get_blocks(b.previous_hash, 20, timeout=5).addCallbacks(got_block, got_error)
+ return "waiting on block.."
+ b = self.blocks[b.previous_hash]
+ if b is None:
+ return # in progress
else:
- if block.previous_hash not in self.blocks or block.previous_hash not in self.verified:
- self.blocks[block.hash] = block
-
- self.referrers.setdefault(block.hash, []).append(block)
- if from_node is None:
- from_node = random.choice(self.peers)
- def got_block(data):
- if data is None: return
- self.received_block(Block(data))
- from_node.rpc_get_block(block.previous_hash).addCallback(got_block)
-
- return "chain not formed, XXX maybe use deferred ..."
- else:
- previous_block = self.blocks[block.previous_hash]
-
- if block.pos != previous_block.pos + 1:
- return "pos needs to advance by 1"
-
- if block.difficulty != previous_block.difficulty + (previous_block.difficulty + 999) // 1000:
- return "difficulty must follow pattern"
-
- self.blocks[block.hash] = block
- self.verified_block(block)
+ previous_block = self.blocks[block.previous_hash]
+
+ if block.pos != previous_block.pos + 1:
+ return "pos needs to advance by 1"
+
+ if block.difficulty != previous_block.difficulty + 1: #(previous_block.difficulty + 999) // 1000:
+ return "difficulty must follow pattern"
+
+ self.blocks[block.hash] = block
+ self.referrers.setdefault(block.previous_hash, set()).add(block)
+ self.say("i_received", block.pos, block.message)
+ self.verified_block(block, depth=depth + 1)
- def verified_block(self, block, from_node=None):
+ def verified_block(self, block, from_node=None, depth=0):
assert block.hash in self.blocks
self.verified.add(block.hash)
+ self.say("verified", block.pos, block.message)
- for referring_block in self.referrers.pop(block.hash, []):
- self.received_block(referring_block) # no from_node here because we might send the newly released block back
+ for referring_block in self.referrers.pop(block.hash, set()):
+ if depth > 100:
+ reactor.callLater(0, self.received_block, referring_block) # no from_node here because we might send the newly released block back
+ else:
+ self.received_block(referring_block, depth=depth+1)
for peer in self.peers:
if peer == from_node:
continue
self.say("spreading to", peer.address[1])
- peer.rpc_gossip(block.data)
+ peer.rpc_gossip(block.data).addErrback(lambda fail: None)
if self.best_block is None or block.pos > self.best_block.pos:
+ self.say("new best", block.pos, block.message)
self.best_block = block
cbs = self.best_block_callbacks
@@ -275,14 +315,18 @@ def add_contact(self, address, remote_id=None):
rn = RemoteNode(self, address, remote_id)
self.contacts[(address, remote_id)] = rn
insort(self.peers, rn, key=self.distance_to_node)
- line((self.distance_to_node(rn)/(2.**128)*255, 0, self.distance_to_node(rn)/(2.**128)*255), get_coords(self.id), get_coords(remote_id))
+ dist = self.distance_to_node(rn)
+ reactor.callLater(.0, line, (255-dist/(2.**160)*255, 0, 255-dist/(2.**160)*255), get_coords(self.id), get_coords(rn.id))
@defer.inlineCallbacks
def f(his_hash):
if his_hash is None:
return
if his_hash in self.blocks:
return
- block = Block((yield rn.rpc_get_block(his_hash)))
+ try:
+ block = Block((yield rn.rpc_get_block(his_hash)))
+ except defer.TimeoutError:
+ return
if block is None: return # shouldn't happen, ever ...
self.received_block(block, rn)
rn.rpc_get_best_block_hash().addCallback(f)
@@ -292,7 +336,7 @@ def f(his_hash):
def ask_random_contact_for_peers(self):
if not self.peers:
return
- c = random.choice(self.peers[:5]) # closest
+ c = random.choice(self.peers) # closest
for address, id in (yield c.rpc_get_close_nodes(self.id, 2)):
address = tuple(address) # list -> tuple
self.add_contact(address, id)
@@ -306,7 +350,7 @@ def get_time_offset(self, timeout=DEFAULT_TIMEOUT):
for call in calls:
try:
ts, other_time = yield call
- except:
+ except Exception:
continue
results.append((begin+ts)/2 - other_time)
print results
@@ -331,10 +375,6 @@ def datagramReceived(self, datagram, addr):
t.cancel()
- #print self.port, tag, repr(response)
- if tag in tags:
- print "AHHHHHHHHHHHHH"
- tags.add(tag)
if is_error:
d.errback(RemoteError(response))
else:
@@ -345,7 +385,7 @@ def datagramReceived(self, datagram, addr):
method = getattr(self, "rpc_" + method_name)
try:
- v = yield method((addr, remote_id, rn), *args)
+ v = yield method(rn, *args)
except Exception, e:
is_error = True
response = str(e)
@@ -356,38 +396,59 @@ def datagramReceived(self, datagram, addr):
# RPCs
- def rpc_ping(self, _):
+ def rpc_ping(self, node):
return defer.succeed("pong")
- def rpc_get_contacts(self, _):
+ def rpc_get_contacts(self, node):
return defer.succeed([(c.address, c.id) for c in self.peers])
- def rpc_get_my_address(self, (address, id, node)):
- return defer.succeed(address)
+ def rpc_get_my_address(self, node):
+ return defer.succeed(node.address)
- def rpc_get_close_nodes(self, _, dest, n):
+ def rpc_get_close_nodes(self, node, dest, n):
return defer.succeed([(close_peer.address, close_peer.id) for close_peer in sorted(self.peers, key=lambda peer: peer.distance_to_id(dest))[:n]])
- def rpc_get_best_block_hash(self, _):
- if self.best_block is None:
- return defer.succeed(None)
- return defer.succeed(self.best_block.hash)
+ def rpc_get_best_block_hash(self, node):
+ best_block_hash = None
+ if self.best_block is not None:
+ best_block_hash = self.best_block.hash
+ return defer.succeed(best_block_hash)
- def rpc_gossip(self, (address, id, node), x):
- line((255-self.distance_to_id(id)/(2.**128)*255, 255, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))
- reactor.callLater(.1, line, (255-self.distance_to_id(id)/(2.**128)*255, 0, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))
+ def rpc_gossip(self, node, block_data):
+ dist = self.distance_to_node(node)
+ reactor.callLater(.0, line, (255-dist/(2.**160)*255, 255, 255-dist/(2.**160)*255), get_coords(self.id), get_coords(node.id))
+ reactor.callLater(.1, line, (255-dist/(2.**160)*255, 0, 255-dist/(2.**160)*255), get_coords(self.id), get_coords(node.id))
- self.received_block(Block(x), node)
+ self.received_block(Block(block_data), node)
return defer.succeed(None)
- def rpc_get_block(self, _, hash):
- if hash in self.blocks:
- return defer.succeed(self.blocks[hash].data)
- else:
- return defer.succeed(None)
+ def rpc_get_block(self, node, block_hash):
+ block_data = None
+ if block_hash in self.blocks:
+ block = self.blocks[block_hash]
+ assert block.hash == block_hash
+ block_data = block.data
+ return defer.succeed(block_data)
- def rpc_get_time(self, _):
+ def rpc_get_blocks(self, node, block_hash, n):
+ result = []
+ while True:
+ try:
+ block = self.blocks[block_hash]
+ except KeyError:
+ break
+ if block is None:
+ break
+ result.append(block.data)
+ if len(result) >= n:
+ break
+ block_hash = block.previous_hash
+ if block_hash is None:
+ break
+ return defer.succeed(result)
+
+ def rpc_get_time(self, node):
return defer.succeed(time.time())
def median(x):
@@ -398,6 +459,7 @@ def median(x):
return (y[left] + y[right])/2
def parse(x):
+ if ':' not in x: return ('127.0.0.1', int(x))
ip, port = x.split(':')
return ip, int(port)
@@ -414,23 +476,23 @@ def add_node(knowns=[]):
while True:
port = random.randrange(49152, 65536)
try:
- reactor.listenUDP(port, Node(port, [("127.0.0.1", x) for x in knowns]))
+ reactor.listenUDP(port, Node(port, knowns))
except error.CannotListenError:
pass
else:
- return port
+ return ('127.0.0.1', port)
if 0:
pool = []
task.LoopingCall(lambda: pool.append(add_node(random.sample(pool, 1) if pool else []))).start(13)
-
-pool = []
-for i in xrange(3):
- reactor.callLater(10*i, lambda: pool.append(add_node(random.sample(pool, 1) if pool else [])))
+if 0:
+ pool = []
+ for i in xrange(3):
+ reactor.callLater(10*i, lambda: pool.append(add_node(random.sample(pool, 1) if pool else [])))
-print "---"
+#print "---"
#port = random.randrange(49152, 65536)
#reactor.listenUDP(port, Node(port, map(parse, sys.argv[1:])))
@@ -443,6 +505,8 @@ def add_node(knowns=[]):
def print_line(x):
print x
-#task.LoopingCall(print_line, "").start(.25)
+task.LoopingCall(print_line, "").start(.5)
+
+add_node(map(parse, sys.argv[1:]))
reactor.run()

0 comments on commit c691f7d

Please sign in to comment.