-
Notifications
You must be signed in to change notification settings - Fork 0
/
indexer.go
193 lines (167 loc) · 4.95 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package indexer
import (
"context"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/ArkeoNetwork/common/logging"
arkutils "github.com/ArkeoNetwork/common/utils"
"github.com/ArkeoNetwork/directory/pkg/db"
"github.com/pkg/errors"
tmlog "github.com/tendermint/tendermint/libs/log"
tmclient "github.com/tendermint/tendermint/rpc/client/http"
tmtypes "github.com/tendermint/tendermint/types"
)
var log = logging.WithoutFields()
type IndexerAppParams struct {
ArkeoApi string
TendermintApi string
TendermintWs string
ChainID string
Bech32PrefixAccAddr string
Bech32PrefixAccPub string
IndexerID int64
db.DBConfig
}
type IndexerApp struct {
Height int64
IsSynced atomic.Bool
params IndexerAppParams
db *db.DirectoryDB
done chan struct{}
}
func NewIndexer(params IndexerAppParams) *IndexerApp {
d, err := db.New(params.DBConfig)
if err != nil {
panic(fmt.Sprintf("error connecting to the db: %+v", err))
}
return &IndexerApp{params: params, db: d}
}
func (a *IndexerApp) Run() (done <-chan struct{}, err error) {
// initialize by reading all existing providers?
a.done = make(chan struct{})
go a.realtime()
go a.gapFiller()
return a.done, nil
}
func NewTenderm1intClient(baseURL string) (*tmclient.HTTP, error) {
client, err := tmclient.New(baseURL, "/websocket")
if err != nil {
return nil, errors.Wrapf(err, "error creating websocket client")
}
logger := tmlog.NewTMLogger(tmlog.NewSyncWriter(os.Stdout))
client.SetLogger(logger)
return client, nil
}
const fillThreads = 3
var gaps []*db.BlockGap
func (a *IndexerApp) gapFiller() {
var err error
workChan := make(chan *db.BlockGap, 64)
tm, err := arkutils.NewTendermintClient(a.params.TendermintWs)
if err != nil {
log.Panicf("error creating gapFiller client: %+v", err)
}
ctx := context.Background()
for {
gaps, err = a.db.FindBlockGaps()
if err != nil {
log.Errorf("error reading blocks from db: %+v", err)
}
latestStored, err := a.db.FindLatestBlock()
if err != nil {
log.Panicf("error finding latest stored block: %+v", err)
}
latest, err := tm.Block(ctx, nil)
if err != nil {
log.Panicf("error finding latest block: %+v", err)
}
if latestStored == nil {
log.Infof("no latestStored, initializing")
gaps = append(gaps, &db.BlockGap{Start: 1, End: latest.Block.Height})
} else if latest.Block.Height-latestStored.Height > 1 {
log.Infof("%d missed blocks from %d to current %d", latest.Block.Height-latestStored.Height, latestStored.Height, latest.Block.Height)
gaps = append(gaps, &db.BlockGap{Start: latestStored.Height + 1, End: latest.Block.Height - 1})
}
if len(gaps) > 0 {
log.Infof("have %d gaps to fill: %s", len(gaps), gaps)
for i := range gaps {
workChan <- gaps[i]
}
startThreads := len(gaps)
if startThreads > fillThreads {
startThreads = fillThreads
}
var wg sync.WaitGroup
wg.Add(len(gaps))
for i := 0; i < startThreads; i++ {
go func() {
for {
select {
case g := <-workChan:
if err := a.fillGap(*g); err != nil {
log.Errorf("error filling gap %s", g)
}
wg.Done()
default:
log.Infof("no work delivered, done")
return
}
}
}()
}
log.Infof("waiting for %d threads to complete filling %d gaps", startThreads, len(gaps))
wg.Wait()
}
// all gaps filled, wait a minute
time.Sleep(time.Minute)
}
}
// gaps filled inclusively
func (a *IndexerApp) fillGap(gap db.BlockGap) error {
log.Infof("gap filling %s", gap)
tm, err := arkutils.NewTendermintClient(a.params.TendermintWs)
if err != nil {
return errors.Wrapf(err, "error creating tm client: %+v", err)
}
for i := gap.Start; i <= gap.End; i++ {
log.Infof("processing %d", i)
block, err := a.consumeHistoricalBlock(tm, i)
if err != nil {
log.Errorf("error consuming block %d: %+v", i, err)
continue
}
if _, err = a.db.InsertBlock(block); err != nil {
log.Errorf("error inserting block %d with hash %s: %+v", block.Height, block.Hash, err)
time.Sleep(time.Second)
}
}
return nil
}
const numClients = 3
func (a *IndexerApp) realtime() {
log.Infof("starting realtime indexing using /websocket at %s", a.params.TendermintWs)
clients := make([]*tmclient.HTTP, numClients)
for i := 0; i < numClients; i++ {
client, err := arkutils.NewTendermintClient(a.params.TendermintWs)
if err != nil {
panic(fmt.Sprintf("error creating tm client for %s: %+v", a.params.TendermintWs, err))
}
if err = client.Start(); err != nil {
panic(fmt.Sprintf("error starting ws client: %s: %+v", a.params.TendermintWs, err))
}
defer client.Stop()
clients[i] = client
}
a.consumeEvents(clients)
a.done <- struct{}{}
}
func (a *IndexerApp) handleBlockEvent(block *tmtypes.Block) error {
if _, err := a.db.InsertBlock(&db.Block{Height: block.Height, Hash: block.Hash().String(), BlockTime: block.Time}); err != nil {
return errors.Wrapf(err, "error inserting block")
}
a.Height = block.Height
return nil
}