Skip to content

Commit

Permalink
work on p2p
Browse files Browse the repository at this point in the history
git-svn-id: svn://forre.st/undns@1197 470744a7-cac9-478e-843e-5ec1b25c69e8
  • Loading branch information
forrest committed Mar 19, 2011
1 parent e730b75 commit 4142708
Showing 1 changed file with 165 additions and 110 deletions.
275 changes: 165 additions & 110 deletions p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def draw():
reactor.callLater(.1, draw)
draw()

def do_work(x, difficulty, stop_flag=[False], start=0):
for i in itertools.count(start):
def do_work(x, difficulty, stop_flag=[False]):
for i in itertools.count(random.randrange(2**63)):
if stop_flag[0]: return
d = json.dumps((x, i))
if int(hashlib.sha1(d).hexdigest(), 16) % difficulty == 0:
Expand Down Expand Up @@ -92,6 +92,17 @@ class RemoteError(Exception):
GENESIS_DIFFICULTY = 10000

class Node(protocol.DatagramProtocol):

# characteristics

def distance_to_node(self, other):
return self.distance_to_id(other.id)

def distance_to_id(self, other_id):
return self.id ^ other_id

# initialization

def __init__(self, port, bootstrap_addresses):
#protocol.DatagramProtocol.__init__(self)
self.peers = []
Expand All @@ -106,27 +117,32 @@ def __init__(self, port, bootstrap_addresses):
self.longest_chain_head_hash = None
self.longest_chain_head_callbacks = []

# we rely on do_work being deterministic here ...
if self.accept(do_work((None, 0, "genesis", GENESIS_DIFFICULTY), GENESIS_DIFFICULTY)) is not None:
raise ValueError
if not self.blocks:
if self.accept(do_work((None, 0, "genesis", GENESIS_DIFFICULTY), GENESIS_DIFFICULTY), self.id) is not None:
raise ValueError

def startProtocol(self):
circles.append(((255, 0, 0), get_coords(self.id), 5))
reactor.callLater(random.expovariate(1/.1), self.think)
reactor.callLater(random.uniform(10, 12), self.try_to_do_something)
self.think()
self.try_to_do_something()
#reactor.callLater(random.expovariate(1/.1), self.think)
#reactor.callLater(random.uniform(10, 12), self.try_to_do_something)

# utility functions

def think(self):
reactor.callLater(random.expovariate(1/.1), self.think)

if self.bootstrap_addresses:
self.add_contact(random.choice(self.bootstrap_addresses))

self.ask_random_contact_for_peers()

reactor.callLater(random.expovariate(1/.1), self.think)
#print len(self.peers)

@defer.inlineCallbacks
def try_to_do_something(self):
while True:
#self.get_time_offset().addCallback(print_line)
previous_hash = self.longest_chain_head_hash
if previous_hash is None:
d = defer.Deferred()
Expand All @@ -137,8 +153,9 @@ def try_to_do_something(self):
previous_previous_hash, previous_pos, previous_message, previous_difficulty = previous_contents

pos = previous_pos + 1
message = random.randrange(2**64)
message = "%i was here! w00t" % (self.port,)
difficulty = previous_difficulty + (previous_difficulty + 999) // 1000
difficulty = previous_difficulty * 2

contents = previous_hash, pos, message, difficulty

Expand All @@ -147,22 +164,91 @@ def abort():
stop_flag[0] = True
self.longest_chain_head_callbacks.append(abort)

print self.port, "start", pos
#print self.port, "start", pos

result = yield threads.deferToThread(do_work, contents, difficulty, stop_flag, random.randrange(2**64))
result = yield threads.deferToThread(do_work, contents, difficulty, stop_flag)

if result is None: # we aborted because of a new longest chain
print self.port, "aborted", pos
#print self.port, "aborted", pos
continue

print self.port, "generated", pos

self.rpc_gossip(None, result)
err = self.accept(result, self.id)
if err is not None:
print "GENERATED BLOCK NOT ACCEPTED BY SELF"

d = defer.Deferred()
reactor.callLater(random.expovariate(1/1), d.callback, None)
yield d
#d = defer.Deferred()
#reactor.callLater(random.expovariate(1/1), d.callback, None)
#yield d

def accept(self, block, from_id):
try:
result = self.accept2(block)
except Exception, e:
#traceback.print_exc()
result = str(e)
#return False
print self.port, "RECEIVED BLOCK"
print " BLOCK:", block
print " RESULT:", result
if result is not None:
return result
for peer in self.peers:
if peer.id == from_id:
continue
peer.rpc_gossip(block)

def accept2(self, block):
hash = int(hashlib.sha1(block).hexdigest(), 16)

if hash in self.blocks:
return "already accepted"

contents, nonce = json.loads(block)

previous_hash, pos, message, difficulty = contents

if hash % difficulty != 0:
return "invalid nonce"

if self.longest_chain_head_hash is not None and pos < self.longest_chain_head_pos - 10:
return "you lose"

if pos == 0:
if previous_hash is not None:
return "genesis block can't refer to previous..."

if difficulty != GENESIS_DIFFICULTY:
return "genesis difficulty"
else:
if previous_hash not in self.blocks:
p = random.choice(self.peers)
p.rpc_get_block(previous_hash).addCallback(lambda block: self.accept(block, p.id)).addCallback(print_line)
return "chain not formed, XXX maybe use deferred ..."

previous_block = self.blocks[previous_hash]
previous_contents, previous_nonce = json.loads(previous_block)
prevous_hash, previous_pos, previous_message, previous_difficulty = previous_contents

if pos != previous_pos + 1:
return "pos needs to advance by 1"

if difficulty != previous_difficulty * 2: # previous_difficulty + (previous_difficulty + 999) // 1000:
return "difficulty must follow pattern"

self.blocks[hash] = block
#print self.port, "received", pos

if self.longest_chain_head_hash is None or pos > self.longest_chain_head_pos:
self.longest_chain_head_hash = hash
self.longest_chain_head_pos = pos
cbs = self.longest_chain_head_callbacks
self.longest_chain_head_callbacks = []
for cb in cbs:
cb()

@defer.inlineCallbacks
def add_contact(self, address, remote_id=None):
if address in self.contacts:
return
Expand All @@ -175,25 +261,46 @@ def add_contact(self, address, remote_id=None):
self.contacts[address] = rn
insort(self.peers, rn, key=self.distance_to_node)
line((self.distance_to_node(rn)/(2.**128)*255, 0, self.distance_to_node(rn)/(2.**128)*255), get_coords(self.id), get_coords(remote_id))
his_hash = yield rn.rpc_get_longest_chain_head_hash()
if his_hash in self.blocks:
return
block = yield rn.rpc_get_block(his_hash)
self.accept(block, rn.id)

@defer.inlineCallbacks
def ask_random_contact_for_peers(self):
if not self.contacts:
return
#if not self.contacts:
# return
#c = random.choice(self.contacts.values())
c = self.peers[0] # closest
if not self.peers:
return
c = random.choice(self.peers[:5]) # closest
for address, id in (yield c.rpc_get_close_nodes(self.id, 2)):
address = tuple(address) # list -> tuple
self.add_contact(address, id)

def distance_to_node(self, other):
return self.distance_to_id(other.id)
@defer.inlineCallbacks
def get_time_offset(self, timeout=DEFAULT_TIMEOUT):
nodes = random.sample(self.peers, min(6, len(self.peers)))
calls = [node.rpc_get_time(timeout=timeout).addCallback(lambda other_time: (time.time(), other_time)) for node in nodes]
begin = time.time()
results = [0]
for call in calls:
try:
ts, other_time = yield call
except:
continue
results.append((begin+ts)/2 - other_time)
print results
defer.returnValue(median(results))

def distance_to_id(self, other):
return self.id ^ other
# network

@defer.inlineCallbacks
def datagramReceived(self, datagram, addr):
#if random.randrange(100) == 0:
# return # randomly drop packets

remote_id, is_answer, contents = json.loads(datagram)
self.add_contact(addr, remote_id)
if is_answer:
Expand Down Expand Up @@ -221,6 +328,8 @@ def datagramReceived(self, datagram, addr):
response = v
self.transport.write(json.dumps((self.id, True, (tag, is_error, response))), addr)

# RPCs

def rpc_ping(self, _):
return defer.succeed("pong")

Expand All @@ -236,123 +345,69 @@ def rpc_get_close_nodes(self, _, dest, n):
def rpc_get_longest_chain_head_hash(self, _):
return self.longest_chain_head_hash

def rpc_gossip(self, _, x):
if _ is not None:
address, id = _
line((255-self.distance_to_id(id)/(2.**128)*255, 255, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))
reactor.callLater(.1, line, (255-self.distance_to_id(id)/(2.**128)*255, 0, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))
result = self.accept(x)
#print self.port, "RECEIVED BLOCK"
#print " BLOCK:", x
#print " RESULT:", result
def rpc_gossip(self, (address, id), x):
line((255-self.distance_to_id(id)/(2.**128)*255, 255, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))
reactor.callLater(.1, line, (255-self.distance_to_id(id)/(2.**128)*255, 0, 255-self.distance_to_id(id)/(2.**128)*255), get_coords(self.id), get_coords(id))

result = self.accept(x, id)
if result is not None:
return
for peer in self.peers:
peer.rpc_gossip(x)

def rpc_get_block(self, hash):
def rpc_get_block(self, _, hash):
return defer.succeed(self.blocks[hash])

def accept(self, block):
try:
res = self.accept2(block)
except Exception, e:
#traceback.print_exc()
return str(e)
#return False
if res is not None:
return res

def accept2(self, block):
hash = int(hashlib.sha1(block).hexdigest(), 16)

if hash in self.blocks:
return "already accepted"

contents, nonce = json.loads(block)

previous_hash, pos, message, difficulty = contents

if hash % difficulty != 0:
return "invalid nonce"

if pos == 0:
if previous_hash is not None:
return "genesis block can't refer to previous..."

if difficulty != GENESIS_DIFFICULTY:
return "genesis difficulty"
else:
if previous_hash not in self.blocks:
random.choice(self.peers).get_block(previous_hash).addCallback(self.accept).addCallback(print_line)
return "chain not formed, XXX maybe use deferred ..."

previous_block = self.blocks[previous_hash]
previous_contents, previous_nonce = json.loads(previous_block)
prevous_hash, previous_pos, previous_message, previous_difficulty = previous_contents

if pos != previous_pos + 1:
return "pos needs to advance by 1"

if difficulty != previous_difficulty + (previous_difficulty + 999) // 1000:
return "difficulty must follow pattern"

self.blocks[hash] = block
#print self.port, "received", pos

if self.longest_chain_head_hash is None or pos > self.longest_chain_head_pos:
self.longest_chain_head_hash = hash
self.longest_chain_head_pos = pos
cbs = self.longest_chain_head_callbacks
self.longest_chain_head_callbacks = []
for cb in cbs:
cb()
def rpc_get_time(self, _):
return defer.succeed(time.time())

def median(x):
# don't really need a complex algorithm here
y = sorted(x)
left = (len(y) - 1)//2
right = len(y)//2
return (y[left] + y[right])/2

def parse(x):
ip, port = x.split(':')
return ip, int(port)

if 0:
last = None
for i in xrange(100):
for i in xrange(5):
port = random.randrange(49152, 65536)
reactor.listenUDP(port, Node(port, [] if last is None else [("127.0.0.1", last)]))
print port
last = port

if 1:
pool = []
for i in xrange(1):
main = random.randrange(49152, 65536)
try:
reactor.listenUDP(main, Node(main, []))
except error.CannotListenError:
pass
else:
pool.append(main)
for i in xrange(30):

def add_node(knowns=[]):
while True:
port = random.randrange(49152, 65536)
try:
reactor.listenUDP(port, Node(port, [("127.0.0.1", random.choice(pool))]))
reactor.listenUDP(port, Node(port, [("127.0.0.1", x) for x in knowns]))
except error.CannotListenError:
pass
else:
print port
return port

if 1:
pool = []
task.LoopingCall(lambda: pool.append(add_node(random.sample(pool, 1) if pool else []))).start(3)



print "---"

#port = random.randrange(49152, 65536)
#reactor.listenUDP(port, Node(port, map(parse, sys.argv[1:])))
#print port

@defer.inlineCallbacks
def x():
print (yield threads.deferToThread(do_work, "hello", 2**20))

#@defer.inlineCallbacks
#def x():
# print (yield threads.deferToThread(do_work, "hello", 2**20))
#x()
#def print_line(x):
# print x

def print_line(x):
print x
#task.LoopingCall(print_line, "").start(.25)

reactor.run()

0 comments on commit 4142708

Please sign in to comment.