-
Notifications
You must be signed in to change notification settings - Fork 21
/
mem_table_loader.ex
138 lines (111 loc) · 3.86 KB
/
mem_table_loader.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
defmodule ArchEthic.P2P.MemTableLoader do
@moduledoc false
use GenServer
alias ArchEthic.DB
alias ArchEthic.P2P
alias ArchEthic.P2P.GeoPatch
alias ArchEthic.P2P.MemTable
alias ArchEthic.P2P.Node
alias ArchEthic.SharedSecrets
alias ArchEthic.TransactionChain
alias ArchEthic.TransactionChain.Transaction
alias ArchEthic.TransactionChain.Transaction.ValidationStamp
alias ArchEthic.TransactionChain.TransactionData
alias ArchEthic.TransactionChain.TransactionData.Keys
require Logger
def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
def init(_args) do
nodes_transactions =
DB.list_transactions_by_type(:node, [
:address,
:type,
:previous_public_key,
data: [:content],
validation_stamp: [:timestamp]
])
node_shared_secret_transactions =
DB.list_transactions_by_type(:node_shared_secrets, [
:address,
:type,
data: [:keys],
validation_stamp: [:timestamp]
])
|> Enum.at(0)
nodes_transactions
|> Stream.concat([node_shared_secret_transactions])
|> Stream.filter(& &1)
|> Enum.sort_by(& &1.validation_stamp.timestamp, {:asc, DateTime})
|> Enum.each(&load_transaction/1)
Enum.each(DB.get_last_p2p_summaries(), &load_p2p_summary/1)
{:ok, %{}}
end
@doc """
Load the transaction and update the P2P view
"""
@spec load_transaction(Transaction.t()) :: :ok
def load_transaction(%Transaction{
address: address,
type: :node,
previous_public_key: previous_public_key,
data: %TransactionData{content: content},
validation_stamp: %ValidationStamp{
timestamp: timestamp
}
}) do
Logger.info("Loading transaction into P2P mem table",
transaction_address: Base.encode16(address),
transaction_type: :node
)
first_public_key = TransactionChain.get_first_public_key(previous_public_key)
{:ok, ip, port, transport, reward_address, _} = Node.decode_transaction_content(content)
node = %Node{
ip: ip,
port: port,
first_public_key: first_public_key,
last_public_key: previous_public_key,
geo_patch: GeoPatch.from_ip(ip),
transport: transport,
last_address: address,
reward_address: reward_address
}
if first_node_change?(first_public_key, previous_public_key) do
node
|> Node.enroll(timestamp)
|> MemTable.add_node()
else
MemTable.add_node(node)
end
Logger.info("Node loaded into in memory p2p tables", node: Base.encode16(first_public_key))
end
def load_transaction(%Transaction{
address: address,
type: :node_shared_secrets,
data: %TransactionData{keys: keys},
validation_stamp: %ValidationStamp{
timestamp: timestamp
}
}) do
Logger.info("Loading transaction into P2P mem table",
transaction_address: Base.encode16(address),
transaction_type: :node_shared_secrets
)
new_authorized_keys = Keys.list_authorized_public_keys_at(keys, 0)
previous_authorized_keys = P2P.authorized_nodes() |> Enum.map(& &1.last_public_key)
unauthorized_keys = previous_authorized_keys -- new_authorized_keys
Enum.each(unauthorized_keys, &MemTable.unauthorize_node/1)
new_authorized_keys
|> Enum.map(&MemTable.get_first_node_key/1)
|> Enum.each(&MemTable.authorize_node(&1, SharedSecrets.next_application_date(timestamp)))
end
def load_transaction(_), do: :ok
defp first_node_change?(first_key, previous_key) when first_key == previous_key, do: true
defp first_node_change?(_, _), do: false
defp load_p2p_summary({node_public_key, {available?, avg_availability}}) do
if available? do
MemTable.set_node_available(node_public_key)
end
MemTable.update_node_average_availability(node_public_key, avg_availability)
end
end