-
Notifications
You must be signed in to change notification settings - Fork 0
/
report.go
252 lines (227 loc) · 6.5 KB
/
report.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
package report
import (
"math"
"sort"
"sync"
"time"
"github.com/furychain/furyint/test/loadtime/payload"
"github.com/furychain/furyint/types"
"github.com/gofrs/uuid"
"gonum.org/v1/gonum/stat"
)
// BlockStore defines the set of methods needed by the report generator from
// Tendermint's store.Blockstore type. Using an interface allows for tests to
// more easily simulate the required behavior without having to use the more
// complex real API.
type BlockStore interface {
Height() uint64
Base() uint64
LoadBlock(uint64) (*types.Block, error)
}
// DataPoint contains the set of data collected for each transaction.
type DataPoint struct {
Duration time.Duration
BlockTime time.Time
Hash []byte
}
// Report contains the data calculated from reading the timestamped transactions
// of each block found in the blockstore.
type Report struct {
ID uuid.UUID
Rate, Connections, Size uint64
Max, Min, Avg, StdDev time.Duration
// NegativeCount is the number of negative durations encountered while
// reading the transaction data. A negative duration means that
// a transaction timestamp was greater than the timestamp of the block it
// was included in and likely indicates an issue with the experimental
// setup.
NegativeCount int
// TPS is calculated by taking the highest averaged TPS over all consecutive blocks
TPS uint64
// All contains all data points gathered from all valid transactions.
// The order of the contents of All is not guaranteed to be match the order of transactions
// in the chain.
All []DataPoint
// used for calculating average during report creation.
sum int64
}
// Reports is a collection of Report objects.
type Reports struct {
s map[uuid.UUID]Report
l []Report
// errorCount is the number of parsing errors encountered while reading the
// transaction data. Parsing errors may occur if a transaction not generated
// by the payload package is submitted to the chain.
errorCount int
}
// List returns a slice of all reports.
func (rs *Reports) List() []Report {
return rs.l
}
// ErrorCount returns the number of erronous transactions encountered while creating the report
func (rs *Reports) ErrorCount() int {
return rs.errorCount
}
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, hash []byte, conns, rate, size uint64) {
r, ok := rs.s[id]
if !ok {
r = Report{
Max: 0,
Min: math.MaxInt64,
ID: id,
Connections: conns,
Rate: rate,
Size: size,
}
rs.s[id] = r
}
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt, Hash: hash})
if l > r.Max {
r.Max = l
}
if l < r.Min {
r.Min = l
}
if int64(l) < 0 {
r.NegativeCount++
}
// Using an int64 here makes an assumption about the scale and quantity of the data we are processing.
// If all latencies were 2 seconds, we would need around 4 billion records to overflow this.
// We are therefore assuming that the data does not exceed these bounds.
r.sum += int64(l)
rs.s[id] = r
}
func (rs *Reports) calculateAll() {
rs.l = make([]Report, 0, len(rs.s))
for _, r := range rs.s {
if len(r.All) == 0 {
r.Min = 0
rs.l = append(rs.l, r)
continue
}
r.Avg = time.Duration(r.sum / int64(len(r.All)))
r.StdDev = time.Duration(int64(stat.StdDev(toFloat(r.All), nil)))
r.TPS = calculateTPS(r.All)
rs.l = append(rs.l, r)
}
}
// calculateTPS calculates the TPS by calculating a average moving window with a minimum size of 1 second over all consecutive blocks
func calculateTPS(in []DataPoint) uint64 {
// create a map of block times to the number of transactions in that block
blocks := make(map[time.Time]int)
for _, v := range in {
blocks[v.BlockTime]++
}
// sort the blocks by time
var blockTimes []time.Time
for k := range blocks {
blockTimes = append(blockTimes, k)
}
sort.Slice(blockTimes, func(i, j int) bool {
return blockTimes[i].Before(blockTimes[j])
})
// Iterave over the blocks and calculate the tps starting from each block
TPS := uint64(0)
for index, blockTime := range blockTimes {
currentTx := blocks[blockTime]
for _, nextBlockTime := range blockTimes[index+1:] {
currentTx += blocks[nextBlockTime]
blockTimeDifference := nextBlockTime.Sub(blockTime)
if blockTimeDifference > time.Second {
currentTPS := uint64(float64(currentTx) / blockTimeDifference.Seconds())
if currentTPS > TPS {
TPS = currentTPS
}
}
}
}
return TPS
}
func (rs *Reports) addError() {
rs.errorCount++
}
// GenerateFromBlockStore creates a Report using the data in the provided
// BlockStore.
func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
type payloadData struct {
id uuid.UUID
l time.Duration
bt time.Time
hash []byte
connections, rate, size uint64
err error
}
type txData struct {
tx types.Tx
bt time.Time
}
reports := &Reports{
s: make(map[uuid.UUID]Report),
}
// Deserializing to proto can be slow but does not depend on other data
// and can therefore be done in parallel.
// Deserializing in parallel does mean that the resulting data is
// not guaranteed to be delivered in the same order it was given to the
// worker pool.
const poolSize = 16
txc := make(chan txData)
pdc := make(chan payloadData, poolSize)
wg := &sync.WaitGroup{}
wg.Add(poolSize)
for i := 0; i < poolSize; i++ {
go func() {
defer wg.Done()
for b := range txc {
p, err := payload.FromBytes(b.tx)
if err != nil {
pdc <- payloadData{err: err}
continue
}
l := b.bt.Sub(p.Time)
idb := (*[16]byte)(p.Id)
pdc <- payloadData{
l: l,
bt: b.bt,
hash: b.tx.Hash(),
id: uuid.UUID(*idb),
connections: p.Connections,
rate: p.Rate,
size: p.GetSize_(),
}
}
}()
}
go func() {
wg.Wait()
close(pdc)
}()
go func() {
for i := s.Base(); i <= s.Height(); i++ {
cur, err := s.LoadBlock(i)
if err != nil {
panic(err)
}
for _, tx := range cur.Data.Txs {
UTCfromUnixNano := time.Unix(0, int64(cur.Header.Time))
txc <- txData{tx: tx, bt: UTCfromUnixNano}
}
}
close(txc)
}()
for pd := range pdc {
if pd.err != nil {
reports.addError()
continue
}
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.hash, pd.connections, pd.rate, pd.size)
}
reports.calculateAll()
return reports, nil
}
func toFloat(in []DataPoint) []float64 {
r := make([]float64, len(in))
for i, v := range in {
r[i] = float64(int64(v.Duration))
}
return r
}