-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
replaced_transaction.ex
134 lines (113 loc) · 3.65 KB
/
replaced_transaction.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
defmodule Indexer.Fetcher.ReplacedTransaction do
@moduledoc """
Finds and updates replaced transactions.
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
require Logger
alias Explorer.Chain
alias Explorer.Chain.Hash
alias Indexer.{BufferedTask, Tracer}
alias Indexer.Fetcher.ReplacedTransaction.Supervisor, as: ReplacedTransactionSupervisor
@behaviour BufferedTask
@max_batch_size 10
@max_concurrency 4
@defaults [
flush_interval: :timer.seconds(3),
max_concurrency: @max_concurrency,
max_batch_size: @max_batch_size,
task_supervisor: Indexer.Fetcher.ReplacedTransaction.TaskSupervisor,
metadata: [fetcher: :replaced_transaction]
]
@spec async_fetch([
%{
required(:nonce) => non_neg_integer,
required(:from_address_hash) => Hash.Address.t(),
required(:block_hash) => Hash.Full.t()
}
]) :: :ok
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do
if ReplacedTransactionSupervisor.disabled?() do
:ok
else
entries = Enum.map(transactions_fields, &entry/1)
BufferedTask.buffer(__MODULE__, entries, timeout)
end
end
@doc false
def child_spec([init_options, gen_server_options]) do
merged_init_opts =
@defaults
|> Keyword.merge(init_options)
|> Keyword.put(:state, {})
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
{:ok, final} =
[:block_hash, :nonce, :from_address_hash, :hash]
|> Chain.stream_pending_transactions(
initial,
fn transaction_fields, acc ->
transaction_fields
|> pending_entry()
|> reducer.(acc)
end,
true
)
final
end
defp entry(%{
block_hash: %Hash{bytes: block_hash_bytes},
nonce: nonce,
from_address_hash: %Hash{bytes: from_address_hash_bytes}
})
when is_integer(nonce) do
{block_hash_bytes, nonce, from_address_hash_bytes}
end
defp pending_entry(%{hash: %Hash{bytes: hash}, nonce: nonce, from_address_hash: %Hash{bytes: from_address_hash_bytes}}) do
{:pending, nonce, from_address_hash_bytes, hash}
end
defp params({block_hash_bytes, nonce, from_address_hash_bytes})
when is_integer(nonce) do
{:ok, from_address_hash} = Hash.Address.cast(from_address_hash_bytes)
{:ok, block_hash} = Hash.Full.cast(block_hash_bytes)
%{nonce: nonce, from_address_hash: from_address_hash, block_hash: block_hash}
end
defp pending_params({:pending, nonce, from_address_hash, hash}) do
%{nonce: nonce, from_address_hash: from_address_hash, hash: hash}
end
@impl BufferedTask
@decorate trace(
name: "fetch",
resource: "Indexer.Fetcher.ReplacedTransaction.run/2",
service: :indexer,
tracer: Tracer
)
def run(entries, _) do
Logger.debug("fetching replaced transactions for transactions")
try do
{pending, realtime} =
entries
|> Enum.split_with(fn entry ->
match?({:pending, _, _, _}, entry)
end)
pending
|> Enum.map(&pending_params/1)
|> Chain.find_and_update_replaced_transactions()
realtime
|> Enum.map(¶ms/1)
|> Chain.update_replaced_transactions()
:ok
rescue
reason ->
Logger.error(fn ->
[
"failed to update replaced transactions for transactions: ",
Exception.format(:error, reason, __STACKTRACE__)
]
end)
{:retry, entries}
end
end
end