Skip to content

Commit c3edd89

Browse files
committed
feat: initial Handshake indexer support
Signed-off-by: Aurora Gaffney <aurora@blinklabs.io>
1 parent 27a3f30 commit c3edd89

File tree

4 files changed

+138
-13
lines changed

4 files changed

+138
-13
lines changed

internal/config/config.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ type MetricsConfig struct {
5050
}
5151

5252
type IndexerConfig struct {
53-
Network string `yaml:"network" envconfig:"INDEXER_NETWORK"`
54-
NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"`
55-
Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"`
56-
SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"`
57-
InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"`
58-
InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"`
59-
Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"`
53+
Network string `yaml:"network" envconfig:"INDEXER_NETWORK"`
54+
NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"`
55+
Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"`
56+
SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"`
57+
InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"`
58+
InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"`
59+
Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"`
60+
HandshakeAddress string `yaml:"handshakeAddress" envconfig:"INDEXER_HANDSHAKE_ADDRESS"`
6061
}
6162

6263
type StateConfig struct {

internal/handshake/peer.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ func (p *Peer) Close() error {
9999
return err
100100
}
101101
p.conn = nil
102+
// Close done channel to signify shutdown internally
102103
close(p.doneCh)
104+
// Close error channel to signify shutdown to consumer
105+
close(p.errorCh)
103106
return nil
104107
}
105108

@@ -187,6 +190,11 @@ func (p *Peer) recvLoop() {
187190
}
188191
}()
189192
if err != nil {
193+
// Don't return an async error if we're already shutting down
194+
select {
195+
case <-p.doneCh:
196+
return
197+
}
190198
p.errorCh <- err
191199
_ = p.Close()
192200
}
@@ -473,6 +481,11 @@ func (p *Peer) Sync(locator [][32]byte, syncFunc SyncFunc) error {
473481
}
474482
}()
475483
if err != nil {
484+
// Don't return an async error if we're already shutting down
485+
select {
486+
case <-p.doneCh:
487+
return
488+
}
476489
p.errorCh <- err
477490
_ = p.Close()
478491
}

internal/indexer/handshake.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2025 Blink Labs Software
2+
//
3+
// Use of this source code is governed by an MIT-style
4+
// license that can be found in the LICENSE file or at
5+
// https://opensource.org/licenses/MIT.
6+
7+
package indexer
8+
9+
import (
10+
"fmt"
11+
"log/slog"
12+
"time"
13+
14+
"github.com/blinklabs-io/cdnsd/internal/config"
15+
"github.com/blinklabs-io/cdnsd/internal/handshake"
16+
)
17+
18+
type handshakeState struct {
19+
peer *handshake.Peer
20+
peerAddress string
21+
peerBackoffDelay time.Duration
22+
blockHeight int
23+
}
24+
25+
func (i *Indexer) startHandshake() error {
26+
cfg := config.GetConfig()
27+
if cfg.Indexer.HandshakeAddress == "" {
28+
return nil
29+
}
30+
i.handshakeState.peerAddress = cfg.Indexer.HandshakeAddress
31+
if err := i.handshakeConnectPeer(); err != nil {
32+
return err
33+
}
34+
return nil
35+
}
36+
37+
func (i *Indexer) handshakeConnectPeer() error {
38+
slog.Info("connecting to Handshake peer", "address", i.handshakeState.peerAddress)
39+
p, err := handshake.NewPeer(nil, handshake.NetworkMainnet)
40+
if err != nil {
41+
return err
42+
}
43+
i.handshakeState.peer = p
44+
if err := i.handshakeState.peer.Connect(i.handshakeState.peerAddress); err != nil {
45+
return err
46+
}
47+
// Async error handler
48+
go func() {
49+
err := <-i.handshakeState.peer.ErrorChan()
50+
if err != nil {
51+
slog.Error(
52+
"peer disconnected",
53+
"error",
54+
err,
55+
)
56+
}
57+
// Try reconnecting to peer until we are successful
58+
for {
59+
if err := i.handshakeConnectPeer(); err == nil {
60+
i.handshakeState.peerBackoffDelay = 0
61+
return
62+
}
63+
if i.handshakeState.peerBackoffDelay == 0 {
64+
// Set initial backoff delay
65+
i.handshakeState.peerBackoffDelay = 1 * time.Second
66+
} else {
67+
// Double backoff delay
68+
i.handshakeState.peerBackoffDelay *= 2
69+
}
70+
// Don't delay longer than 2m
71+
if i.handshakeState.peerBackoffDelay > 120*time.Second {
72+
i.handshakeState.peerBackoffDelay = 120 * time.Second
73+
}
74+
slog.Error(
75+
"connection to Handshake peer failed",
76+
"error",
77+
err,
78+
"delay",
79+
i.handshakeState.peerBackoffDelay.String(),
80+
)
81+
time.Sleep(i.handshakeState.peerBackoffDelay)
82+
}
83+
}()
84+
// Start sync
85+
if err := i.handshakeState.peer.Sync(nil, i.handshakeHandleSync); err != nil {
86+
return err
87+
}
88+
return nil
89+
}
90+
91+
func (i *Indexer) handshakeHandleSync(block *handshake.Block) error {
92+
i.handshakeState.blockHeight++
93+
slog.Debug(
94+
"synced Handshake block",
95+
"height", i.handshakeState.blockHeight,
96+
"hash", fmt.Sprintf("%x", block.Hash()),
97+
"prevHash", fmt.Sprintf("%x", block.Header.PrevBlock),
98+
)
99+
return nil
100+
}

internal/indexer/indexer.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,13 @@ type Domain struct {
5555
}
5656

5757
type Indexer struct {
58-
pipeline *pipeline.Pipeline
59-
domains map[string]Domain
60-
tipReached bool
61-
syncLogTimer *time.Timer
62-
syncStatus input_chainsync.ChainSyncStatus
63-
watched []watchedAddr
58+
pipeline *pipeline.Pipeline
59+
domains map[string]Domain
60+
tipReached bool
61+
syncLogTimer *time.Timer
62+
syncStatus input_chainsync.ChainSyncStatus
63+
watched []watchedAddr
64+
handshakeState handshakeState
6465
}
6566

6667
type watchedAddr struct {
@@ -76,6 +77,16 @@ var globalIndexer = &Indexer{
7677
}
7778

7879
func (i *Indexer) Start() error {
80+
if err := i.startCardano(); err != nil {
81+
return err
82+
}
83+
if err := i.startHandshake(); err != nil {
84+
return err
85+
}
86+
return nil
87+
}
88+
89+
func (i *Indexer) startCardano() error {
7990
// Build watched addresses from enabled profiles
8091
cfg := config.GetConfig()
8192
for _, profile := range config.GetProfiles() {

0 commit comments

Comments
 (0)