diff --git a/config/defaults.toml b/config/defaults.toml index 7f0b535..3d4f508 100644 --- a/config/defaults.toml +++ b/config/defaults.toml @@ -19,14 +19,8 @@ data_path = "~/.elixium" # port-forwarded) port = 31013 -# Maximum number of outbound connections. Upon failing to connect -# via outbound request, these will default to accepting inbound -# connections -max_bidirectional_connections = 10 - -# Maximum number of inbound connections (outside of any -# bidirectional connections that have defaulted to being inbound) -max_inbound_connections = 90 +# Maximum number of connections to peers. +max_connections = 100 # ------------ End Configuration for Elixium Core ------------ diff --git a/lib/peer/peer_router.ex b/lib/peer/peer_router.ex deleted file mode 100644 index ac2a463..0000000 --- a/lib/peer/peer_router.ex +++ /dev/null @@ -1,223 +0,0 @@ -defmodule ElixiumNode.PeerRouter do - use GenServer - require Logger - alias Elixium.Node.Supervisor, as: Peer - alias Elixium.Node.LedgerManager - alias Elixium.Store.Ledger - alias Elixium.Pool.Orphan - alias Elixium.Block - alias Elixium.Transaction - alias Elixium.Validator - alias Elixium.Store.Oracle - - def start_link(_args) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) - end - - def init(_args), do: {:ok, %{known_transactions: []}} - - # Handles recieved blocks - def handle_info({block = %{type: "BLOCK"}, caller}, state) do - block = Block.sanitize(block) - - state = - case LedgerManager.handle_new_block(block) do - :ok -> - # We've received a valid block. We need to stop mining the block we're - # currently working on and start mining the new one. We also need to gossip - # this block to all the nodes we know of. - Logger.info("Received valid block #{block.hash} at index #{:binary.decode_unsigned(block.index)}.") - Peer.gossip("BLOCK", block) - %{state | known_transactions: state.known_transactions -- [block.transactions]} - - :gossip -> - # For one reason or another, we want to gossip this block without - # restarting our current block calculation. (Perhaps this is a fork block) - Peer.gossip("BLOCK", block) - state - - {:missing_blocks, fork_chain} -> - # We've discovered a fork, but we can't rebuild the fork chain without - # some blocks. Let's request them from our peer. - query_block(:binary.decode_unsigned(hd(fork_chain).index) - 1, caller) - state - - :ignore -> - :ignore # We already know of this block. Ignore it - state - - :invalid -> - Logger.info("Recieved invalid block at index #{:binary.decode_unsigned(block.index)}.") - state - end - - {:noreply, state} - end - - def handle_info({block_query_request = %{type: "BLOCK_QUERY_REQUEST"}, caller}, state) do - send(caller, {"BLOCK_QUERY_RESPONSE", Ledger.block_at_height(block_query_request.index)}) - - {:noreply, state} - end - - def handle_info({block_query_response = %{type: "BLOCK_QUERY_RESPONSE"}, _caller}, state) do - orphans_ahead = - Ledger.last_block().index - |> :binary.decode_unsigned() - |> Kernel.+(1) - |> Orphan.blocks_at_height() - |> length() - - if orphans_ahead > 0 do - # If we have an orphan with an index that is greater than our current latest - # block, we're likely here trying to rebuild the fork chain and have requested - # a block that we're missing. - # TODO: FETCH BLOCKS - end - - {:noreply, state} - end - - # Handles a batch block query request, where another peer has asked this node to send - # all the blocks it has since a given index. - def handle_info({block_query_request = %{type: "BLOCK_BATCH_QUERY_REQUEST"}, caller}, state) do - # TODO: This is a possible DOS vulnerability if an attacker requests a very - # high amount of blocks. Need to figure out a better way to do this; maybe - # we need to limit the maximum amount of blocks a peer is allowed to request. - last_block = Ledger.last_block() - - blocks = - if last_block != :err && block_query_request.starting_at <= :binary.decode_unsigned(last_block.index) do - block_query_request.starting_at - |> Range.new(:binary.decode_unsigned(last_block.index)) - |> Enum.map(&Ledger.block_at_height/1) - |> Enum.filter(&(&1 != :none)) - else - [] - end - - send(caller, {"BLOCK_BATCH_QUERY_RESPONSE", %{blocks: blocks}}) - - {:noreply, state} - end - - # Handles a batch block query response, where we've requested new blocks and are now - # getting a response with potentially new blocks - def handle_info({block_query_response = %{type: "BLOCK_BATCH_QUERY_RESPONSE"}, _caller}, state) do - if length(block_query_response.blocks) > 0 do - Logger.info("Recieved #{length(block_query_response.blocks)} blocks from peer.") - - block_query_response.blocks - |> Enum.with_index() - |> Enum.each(fn {block, i} -> - block = Block.sanitize(block) - - if LedgerManager.handle_new_block(block) == :ok do - IO.write("Syncing blocks #{round(((i + 1) / length(block_query_response.blocks)) * 100)}% [#{i + 1}/#{length(block_query_response.blocks)}]\r") - end - end) - - IO.write("Block Sync Complete") - end - - {:noreply, state} - end - - def handle_info({transaction = %{type: "TRANSACTION"}, _caller}, state) do - transaction = Transaction.sanitize(transaction) - - state = - if Validator.valid_transaction?(transaction) do - if transaction not in state.known_transactions do - <> = transaction.id - Logger.info("Received transaction \e[32m#{shortid}...\e[0m") - Peer.gossip("TRANSACTION", transaction) - - %{state | known_transactions: [transaction | state.known_transactions]} - else - state - end - else - Logger.info("Received Invalid Transaction. Ignoring.") - state - end - - {:noreply, state} - end - - def handle_info({:new_outbound_connection, handler_pid}, state) do - # Let's ask our peer for new blocks, if there - # are any. We'll ask for all blocks starting from our current index minus - # 120 (4 hours worth of blocks before we disconnected) just in case there - # was a fork after we disconnected. - - starting_at = - case Ledger.last_block() do - :err -> 0 - last_block -> - # Current index minus 120 or 1, whichever is greater. - max(0, :binary.decode_unsigned(last_block.index) - 120) - end - - send(handler_pid, {"BLOCK_BATCH_QUERY_REQUEST", %{starting_at: starting_at}}) - - send(handler_pid, {"PEER_QUERY_REQUEST", %{}}) - - {:noreply, state} - end - - def handle_info({:new_inbound_connection, handler_pid}, state) do - send(handler_pid, {"PORT_RECONNECTION_QUERY", %{}}) - send(handler_pid, {"PEER_QUERY_REQUEST", %{}}) - - {:noreply, state} - end - - def handle_info({%{type: "PORT_RECONNECTION_QUERY"}, handler_pid}, state) do - port = Application.get_env(:elixium_core, :port) - - send(handler_pid, {"PORT_RECONNECTION_RESPONSE", %{port: port}}) - - {:noreply, state} - end - - def handle_info({%{type: "PORT_RECONNECTION_RESPONSE", port: port}, handler_pid}, state) do - ip = - handler_pid - |> Process.info() - |> Keyword.get(:dictionary) - |> Keyword.get(:connected) - |> String.to_charlist() - - Oracle.inquire(:"Elixir.Elixium.Store.PeerOracle", {:save_known_peer, [{ip, port}]}) - - {:noreply, state} - end - - def handle_info({%{type: "PEER_QUERY_REQUEST"}, handler_pid}, state) do - peers = - :"Elixir.Elixium.Store.PeerOracle" - |> GenServer.call({:load_known_peers, []}) - |> Enum.take(8) - - send(handler_pid, {"PEER_QUERY_RESPONSE", %{peers: peers}}) - - {:noreply, state} - end - - def handle_info({%{type: "PEER_QUERY_RESPONSE", peers: peers}, _caller}, state) do - Enum.each(peers, fn peer -> - GenServer.call(:"Elixir.Elixium.Store.PeerOracle", {:save_known_peer, [peer]}) - end) - - {:noreply, state} - end - - def handle_info(_, state) do - Logger.warn("Received message that isn't handled by any other case.") - - {:noreply, state} - end - - def query_block(index, caller), do: send(caller, {"BLOCK_QUERY_REQUEST", %{index: index}}) -end diff --git a/lib/peer/router.ex b/lib/peer/router.ex new file mode 100644 index 0000000..ed18d63 --- /dev/null +++ b/lib/peer/router.ex @@ -0,0 +1,167 @@ +defmodule ElixiumNode.Router do + use Pico.Client.Router + require Logger + alias Elixium.Node.LedgerManager + alias Elixium.Store.Ledger + alias Elixium.Pool.Orphan + alias Elixium.Block + alias Elixium.Transaction + alias Elixium.Validator + alias Elixium.Store.Oracle + + message "BLOCK", block do + block = Block.sanitize(block) + + case LedgerManager.handle_new_block(block) do + :ok -> + # We've received a valid block. We need to gossip this block to all the + # nodes we know of. + Logger.info("Received valid block #{block.hash} at index #{:binary.decode_unsigned(block.index)}.") + Pico.broadcast("BLOCK", block) + + known_transactions = SharedState.get(:transactions) || [] -- block.transactions + SharedState.set(:transactions, known_transactions) + + :gossip -> + # For one reason or another, we want to gossip this block. (Perhaps this is a fork block) + Pico.broadcast("BLOCK", block) + + {:missing_blocks, fork_chain} -> + # We've discovered a fork, but we can't rebuild the fork chain without + # some blocks. Let's request them from our peer. + Pico.message(conn, "BLOCK_QUERY_REQUEST", %{index: :binary.decode_unsigned(hd(fork_chain).index) - 1}) + + :invalid -> + Logger.info("Recieved invalid block at index #{:binary.decode_unsigned(block.index)}.") + + :ignore -> :ignore + end + end + + message "BLOCK_QUERY_REQUEST", %{index: index} do + Pico.message(conn, "BLOCK_QUERY_RESPONSE", Ledger.block_at_height(index)) + end + + message "BLOCK_QUERY_RESPONSE", response do + orphans_ahead = + Ledger.last_block().index + |> :binary.decode_unsigned() + |> Kernel.+(1) + |> Orphan.blocks_at_height() + |> length() + + if orphans_ahead > 0 do + # If we have an orphan with an index that is greater than our current latest + # block, we're likely here trying to rebuild the fork chain and have requested + # a block that we're missing. + # TODO: FETCH BLOCKS + end + end + + message "BLOCK_BATCH_QUERY_REQUEST", %{starting_at: start} do + # TODO: This is a possible DOS vulnerability if an attacker requests a very + # high amount of blocks. Need to figure out a better way to do this; maybe + # we need to limit the maximum amount of blocks a peer is allowed to request. + last_block = Ledger.last_block() + + blocks = + if last_block != :err && start <= :binary.decode_unsigned(last_block.index) do + start + |> Range.new(:binary.decode_unsigned(last_block.index)) + |> Enum.map(&Ledger.block_at_height/1) + |> Enum.filter(& &1 != :none) + else + [] + end + + Pico.message(conn, "BLOCK_BATCH_QUERY_RESPONSE", %{blocks: blocks}) + end + + message "BLOCK_BATCH_QUERY_RESPONSE", %{blocks: blocks} do + blocks_count = length(blocks) + + if blocks_count > 0 do + Logger.info("Recieved #{blocks_count} blocks from peer.") + + blocks + |> Enum.with_index() + |> Enum.each(fn {block, i} -> + block = Block.sanitize(block) + + if LedgerManager.handle_new_block(block) == :ok do + IO.write("Syncing blocks #{round(((i + 1) / blocks_count) * 100)}% [#{i + 1}/#{blocks_count}]\r") + end + end) + + IO.write("Block Sync Complete") + end + end + + message "TRANSACTION", transaction do + transaction = Transaction.sanitize(transaction) + known_transactions = SharedState.get(:transactions) || [] + + if Validator.valid_transaction?(transaction) do + if transaction not in known_transactions do + <> = transaction.id + Logger.info("Received transaction \e[32m#{shortid}...\e[0m") + Pico.broadcast("TRANSACTION", transaction) + + SharedState.set(:transactions, [transaction | known_transactions]) + end + else + Logger.info("Received Invalid Transaction. Ignoring.") + end + end + + message "PORT_RECONNECTION_QUERY", _ do + port = Application.get_env(:elixium_core, :port) + + Pico.message(conn, "PORT_RECONNECTION_RESPONSE", %{port: port}) + end + + message "PORT_RECONNECTION_RESPONSE", %{port: port} do + ip = String.to_charlist(handler_state.peername) + + Oracle.inquire(:"Elixir.Elixium.Store.PeerOracle", {:save_known_peer, [{ip, port}]}) + end + + message "PEER_QUERY_REQUEST", _ do + peers = + :"Elixir.Elixium.Store.PeerOracle" + |> GenServer.call({:load_known_peers, []}) + |> Enum.take(8) + + Pico.message(conn, "PEER_QUERY_RESPONSE", %{peers: peers}) + end + + message "PEER_QUERY_RESPONSE", %{peers: peers} do + Enum.each(peers, fn peer -> + GenServer.call(:"Elixir.Elixium.Store.PeerOracle", {:save_known_peer, [peer]}) + end) + end + + message "NEW_INBOUND_CONNECTION", _ do + Pico.message(conn, "PORT_RECONNECTION_QUERY") + Pico.message(conn, "PEER_QUERY_REQUEST") + end + + message "NEW_OUTBOUND_CONNECTION", _ do + # Let's ask our peer for new blocks, if there + # are any. We'll ask for all blocks starting from our current index minus + # 120 (4 hours worth of blocks before we disconnected) just in case there + # was a fork after we disconnected. + + starting_at = + case Ledger.last_block() do + :err -> 0 + last_block -> + # Current index minus 120 or 1, whichever is greater. + max(0, :binary.decode_unsigned(last_block.index) - 120) + end + + Pico.message(conn, "BLOCK_BATCH_QUERY_REQUEST", %{starting_at: starting_at}) + Pico.message(conn, "PEER_QUERY_REQUEST") + end + +end diff --git a/lib/peer/supervisor.ex b/lib/peer/supervisor.ex deleted file mode 100644 index 0edcbfd..0000000 --- a/lib/peer/supervisor.ex +++ /dev/null @@ -1,13 +0,0 @@ -defmodule ElixiumNode.PeerRouter.Supervisor do - use Supervisor - - def start_link(_args) do - Supervisor.start_link(__MODULE__, [], name: __MODULE__) - end - - def init(_args) do - children = [ElixiumNode.PeerRouter] - - Supervisor.init(children, strategy: :one_for_one) - end -end diff --git a/lib/rpc/router.ex b/lib/rpc/router.ex index 77d4b96..5bcc5df 100644 --- a/lib/rpc/router.ex +++ b/lib/rpc/router.ex @@ -49,10 +49,8 @@ defmodule ElixiumNode.RPC.Router do end def get("/connected_nodes") do - Elixium.Node.Supervisor.connected_handlers() - |> Enum.map(&Process.info/1) - |> Enum.map(& Keyword.get(&1, :dictionary)) - |> Enum.map(& Keyword.get(&1, :connected)) + Pico.Client.SharedState.connections() + |> Enum.map(fn {_, ip} -> ip end) |> Poison.encode!() end diff --git a/lib/supervisor.ex b/lib/supervisor.ex index 5a1a4ff..388e972 100644 --- a/lib/supervisor.ex +++ b/lib/supervisor.ex @@ -1,14 +1,20 @@ defmodule ElixiumNode.Supervisor do use Supervisor + alias Elixium.Store.Oracle def start_link do Supervisor.start_link(__MODULE__, [], name: __MODULE__) end def init(_args) do + Oracle.start_link(Elixium.Store.Peer) + port = Application.get_env(:elixium_core, :port) + handlers = Application.get_env(:elixium_core, :max_connections) + peers = Elixium.Store.Peer.find_potential_peers() + children = [ - {Elixium.Node.Supervisor, [:"Elixir.ElixiumNode.PeerRouter"]}, - ElixiumNode.PeerRouter.Supervisor + {Pico.Client.Supervisor, {ElixiumNode.Router, peers, port, handlers}}, + Elixium.HostAvailability.Supervisor ] children = diff --git a/mix.exs b/mix.exs index 5ddb5e4..9834218 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule ElixiumNode.MixProject do def project do [ app: :elixium_node, - version: "1.1.3", + version: "1.2.0", elixir: "~> 1.7", start_permanent: true, deps: deps() @@ -29,6 +29,7 @@ defmodule ElixiumNode.MixProject do defp deps do [ {:elixium_core, "~> 0.6"}, + {:pico, "~> 0.1"}, {:poison, "~> 3.1"}, {:distillery, "~> 2.0"}, {:toml, "~> 0.5"},