-
Notifications
You must be signed in to change notification settings - Fork 1
/
p2p_network.rb
145 lines (119 loc) · 3.58 KB
/
p2p_network.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
require 'websocket-eventmachine-client'
require 'websocket-eventmachine-server'
require_relative 'transaction_pool'
module MessageType
QUERY_LATEST = 0
QUERY_ALL = 1
RESPONSE_BLOCKCHAIN = 2
QUERY_TRANSACTION_POOL = 3
RESPONSE_TRANSACTION_POOL = 4
end
class P2PNetwork
attr_accessor :blockchain
def initialize(server_port, initial_peers, blockchain)
@sockets = []
@server_port = server_port
@initial_peers = initial_peers || []
@blockchain = blockchain
end
def start
connect_to_peers(@initial_peers)
init_p2p_server
end
def connect_to_peers(new_peers)
new_peers.each do |p|
ws = WebSocket::EventMachine::Client.connect(:uri => p)
ws.onopen do
init_p2p_connections(ws)
end
ws.onerror do
pp 'connection failed'
end
end
end
def init_p2p_server
WebSocket::EventMachine::Server.start(:host => "0.0.0.0", :port => @server_port) do |ws|
ws.onopen do
pp 'connected'
init_p2p_connections(ws)
end
ws.onclose do
@sockets.delete(ws)
end
ws.onerror do
@sockets.delete(ws)
end
end
end
def init_p2p_connections(ws)
@sockets.push(ws)
ws.onmessage do |data, type|
message = JSON.parse(data)
pp 'Received message' + message.to_s
case message['type']
when MessageType::QUERY_LATEST
write(ws, response_latest_msg)
when MessageType::QUERY_ALL
write(ws, response_chain_msg)
when MessageType::RESPONSE_BLOCKCHAIN
handle_blockchain_response(message)
when MessageType::RESPONSE_TRANSACTION_POOL
end
end
write(ws, query_chain_length_msg)
end
def write(ws, message)
ws.send(message.to_json)
end
def broadcast(message)
@sockets.each do |ws|
write(ws, message)
end
end
def broadcast_response_latest_msg
broadcast(response_latest_msg)
end
def broadcast_transaction_pool
broadcast(response_transaction_pool_msg)
end
def response_chain_msg
{ :type => MessageType::RESPONSE_BLOCKCHAIN, :data => @blockchain.blocks }
end
def response_latest_msg
{ :type => MessageType::RESPONSE_BLOCKCHAIN, :data => [@blockchain.get_latest_block] }
end
def query_all_msg
{ :type => MessageType::QUERY_ALL }
end
def query_chain_length_msg
{ :type => MessageType::QUERY_LATEST }
end
def query_transaction_pool_msg
{ :type => MessageType::QUERY_TRANSACTION_POOL }
end
def response_transaction_pool_msg
{ :type => MessageType::RESPONSE_TRANSACTION_POOL, :data => [get_transaction_pool] }
end
def handle_blockchain_response(message)
received_blocks = message['data'].map { |data| Block.from_dic(data) }
latest_block_received = received_blocks[received_blocks.length - 1]
latest_block_held = blockchain.get_latest_block
if latest_block_received.index > latest_block_held.index
pp 'blockchain possibly behind, We got: ' + latest_block_held.index.to_s + ' Peer got: ' + latest_block_received.index.to_s
if latest_block_held.hash == latest_block_received.previous_hash
@blockchain.blocks.push(latest_block_received)
broadcast(response_latest_msg)
elsif received_blocks.length == 1
pp 'we have to query the chain from our peer'
broadcast(query_all_msg)
else
pp 'Received blockchain is longer than current blockchain'
@blockchain.replace_chain(received_blocks) do |b|
broadcast_response_latest_msg
end
end
else
pp 'received blockchain is not longer than current blockchain. Do nothing'
end
end
end