-
Notifications
You must be signed in to change notification settings - Fork 2
/
tx_errorsink.go
180 lines (164 loc) · 5.39 KB
/
tx_errorsink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package types
import (
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/PositionExchange/posichain/internal/utils"
staking "github.com/PositionExchange/posichain/staking/types"
)
const (
plainTxSinkLimit = 1024
stakingTxSinkLimit = 1024
logTag = "[TransactionErrorSink]"
)
// TransactionErrorReport ..
type TransactionErrorReport struct {
TxHashID string `json:"tx-hash-id"`
StakingDirective string `json:"directive-kind,omitempty"`
TimestampOfRejection int64 `json:"time-at-rejection"`
ErrMessage string `json:"error-message"`
}
// TransactionErrorReports ..
type TransactionErrorReports []*TransactionErrorReport
// TransactionErrorSink is where all failed transactions get reported.
// Note that the keys of the lru caches are tx-hash strings.
type TransactionErrorSink struct {
failedPlainTxs *lru.Cache
failedStakingTxs *lru.Cache
}
// NewTransactionErrorSink ..
func NewTransactionErrorSink() *TransactionErrorSink {
failedPlainTx, _ := lru.New(plainTxSinkLimit)
failedStakingTx, _ := lru.New(stakingTxSinkLimit)
return &TransactionErrorSink{
failedPlainTxs: failedPlainTx,
failedStakingTxs: failedStakingTx,
}
}
// Add a transaction to the error sink with the given error
func (sink *TransactionErrorSink) Add(tx PoolTransaction, err error) {
// no-op if no error is provided
if err == nil {
return
}
if plainTx, ok := tx.(*Transaction); ok {
hash := plainTx.Hash().String()
sink.failedPlainTxs.Add(hash, &TransactionErrorReport{
TxHashID: hash,
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
})
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Err(err).
Msgf("Added plain transaction error message")
} else if ethTx, ok := tx.(*EthTransaction); ok {
hash := ethTx.Hash().String()
sink.failedPlainTxs.Add(hash, &TransactionErrorReport{
TxHashID: hash,
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
})
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Err(err).
Msgf("Added eth transaction error message")
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
hash := stakingTx.Hash().String()
sink.failedStakingTxs.Add(hash, &TransactionErrorReport{
TxHashID: hash,
StakingDirective: stakingTx.StakingType().String(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
})
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Err(err).
Msgf("Added staking transaction error message")
} else {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx", tx).
Err(err).
Msg("Attempted to add an unknown transaction type")
}
}
// Contains checks if there is an error associated with the given hash
// Note that the keys of the lru caches are tx-hash strings.
func (sink *TransactionErrorSink) Contains(hash string) bool {
return sink.failedPlainTxs.Contains(hash) || sink.failedStakingTxs.Contains(hash)
}
// Remove a transaction's error from the error sink
func (sink *TransactionErrorSink) Remove(tx PoolTransaction) {
if plainTx, ok := tx.(*Transaction); ok {
hash := plainTx.Hash().String()
sink.failedPlainTxs.Remove(hash)
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Msgf("Removed plain transaction error message")
} else if ethTx, ok := tx.(*EthTransaction); ok {
hash := ethTx.Hash().String()
sink.failedPlainTxs.Remove(hash)
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Msgf("Removed plain transaction error message")
} else if stakingTx, ok := tx.(*staking.StakingTransaction); ok {
hash := stakingTx.Hash().String()
sink.failedStakingTxs.Remove(hash)
utils.Logger().Debug().
Str("tag", logTag).
Interface("tx-hash-id", hash).
Msgf("Removed staking transaction error message")
} else {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx", tx).
Msg("Attempted to remove an unknown transaction type")
}
}
// PlainReport ..
func (sink *TransactionErrorSink) PlainReport() TransactionErrorReports {
return reportErrorsFromLruCache(sink.failedPlainTxs)
}
// StakingReport ..
func (sink *TransactionErrorSink) StakingReport() TransactionErrorReports {
return reportErrorsFromLruCache(sink.failedStakingTxs)
}
// PlainCount ..
func (sink *TransactionErrorSink) PlainCount() int {
return sink.failedPlainTxs.Len()
}
// StakingCount ..
func (sink *TransactionErrorSink) StakingCount() int {
return sink.failedStakingTxs.Len()
}
// reportErrorsFromLruCache is a helper for reporting errors
// from the TransactionErrorSink's lru cache. Do not use this function directly,
// use the respective public methods of TransactionErrorSink.
func reportErrorsFromLruCache(lruCache *lru.Cache) TransactionErrorReports {
rpcErrors := TransactionErrorReports{}
for _, txHash := range lruCache.Keys() {
rpcErrorFetch, ok := lruCache.Get(txHash)
if !ok {
utils.Logger().Warn().
Str("tag", logTag).
Interface("tx-hash-id", txHash).
Msgf("Error not found in sink")
continue
}
rpcError, ok := rpcErrorFetch.(*TransactionErrorReport)
if !ok {
utils.Logger().Error().
Str("tag", logTag).
Interface("tx-hash-id", txHash).
Msgf("Invalid type of value in sink")
continue
}
rpcErrors = append(rpcErrors, rpcError)
}
return rpcErrors
}