Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ type MetricsConfig struct {
}

type IndexerConfig struct {
Network string `yaml:"network" envconfig:"INDEXER_NETWORK"`
NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"`
Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"`
SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"`
InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"`
InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"`
Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"`
Network string `yaml:"network" envconfig:"INDEXER_NETWORK"`
NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"`
Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"`
SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"`
InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"`
InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"`
Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"`
HandshakeAddress string `yaml:"handshakeAddress" envconfig:"INDEXER_HANDSHAKE_ADDRESS"`
}

type StateConfig struct {
Expand Down
12 changes: 5 additions & 7 deletions internal/handshake/covenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package handshake
import (
"encoding/binary"
"errors"
"fmt"
"io"
)

Expand Down Expand Up @@ -66,13 +67,13 @@ func (c *GenericCovenant) Covenant() Covenant {
case CovenantTypeRegister:
ret, err := NewRegisterCovenantFromGeneric(c)
if err != nil {
panic("can't convert generic covenant to Register")
panic(fmt.Sprintf("can't convert generic covenant to Register: %s", err))
}
return ret
case CovenantTypeUpdate:
ret, err := NewUpdateCovenantFromGeneric(c)
if err != nil {
panic("can't convert generic covenant to Update")
panic(fmt.Sprintf("can't convert generic covenant to Update: %s", err))
}
return ret
}
Expand Down Expand Up @@ -120,7 +121,6 @@ type UpdateCovenant struct {
NameHash []byte
Height uint32
ResourceData DomainResourceData
BlockHash []byte
}

func (UpdateCovenant) isCovenant() {}
Expand All @@ -131,16 +131,14 @@ func NewUpdateCovenantFromGeneric(
if gc.Type != CovenantTypeUpdate {
return nil, errors.New("wrong covenant type")
}
if len(gc.Items) != 4 {
if len(gc.Items) != 3 {
return nil, errors.New("incorrect items length")
}
ret := &UpdateCovenant{
NameHash: make([]byte, len(gc.Items[0])),
BlockHash: make([]byte, len(gc.Items[3])),
NameHash: make([]byte, len(gc.Items[0])),
}
// Copy hashes
copy(ret.NameHash, gc.Items[0])
copy(ret.BlockHash, gc.Items[3])
// Decode height from bytes
ret.Height = binary.LittleEndian.Uint32(gc.Items[1])
// Decode resource data
Expand Down
6 changes: 0 additions & 6 deletions internal/handshake/covenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ func TestCovenantUpdateFromGeneric(t *testing.T) {
decodeHex(
"0002036e73310a69727677696c6c69616d002ce706b701c00202036e7332c00636d688f601c01a00d5580d0114402ed0125506f35ba249265f39b988d7028a28c300d5580d02200c6c45064c26b529b4ac074dff5de60a99d6025d5b0d7f32c2b8c7d40ec8b3de00d5580d043071cb0417852b08b965413f3b871b033996159d121a585e35111a335d4cfb79b67e49a99c3829f6a1f42e100f7f33d7d9",
),
decodeHex(
"0000000000000000153c62dbcabb762c254fb4104ab7cdd779926b79b34601fc",
),
},
}
expectedCovenant := &handshake.UpdateCovenant{
Expand Down Expand Up @@ -166,9 +163,6 @@ func TestCovenantUpdateFromGeneric(t *testing.T) {
},
},
},
BlockHash: decodeHex(
"0000000000000000153c62dbcabb762c254fb4104ab7cdd779926b79b34601fc",
),
}
tmpCovenant, err := handshake.NewUpdateCovenantFromGeneric(
testGenericCovenant,
Expand Down
18 changes: 18 additions & 0 deletions internal/handshake/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (p *Peer) Close() error {
return err
}
p.conn = nil
// Close done channel to signify shutdown
close(p.doneCh)
return nil
}
Expand All @@ -108,6 +109,11 @@ func (p *Peer) ErrorChan() <-chan error {
return p.errorCh
}

// DoneChan returns the shutdown channel
func (p *Peer) DoneChan() <-chan struct{} {
return p.doneCh
}

// setupConnection runs the initial handshake and starts the receive loop
func (p *Peer) setupConnection() error {
// Init channels for async messages
Expand Down Expand Up @@ -187,6 +193,12 @@ func (p *Peer) recvLoop() {
}
}()
if err != nil {
// Don't return an async error if we're already shutting down
select {
case <-p.doneCh:
return
default:
}
p.errorCh <- err
_ = p.Close()
}
Expand Down Expand Up @@ -473,6 +485,12 @@ func (p *Peer) Sync(locator [][32]byte, syncFunc SyncFunc) error {
}
}()
if err != nil {
// Don't return an async error if we're already shutting down
select {
case <-p.doneCh:
return
default:
}
p.errorCh <- err
_ = p.Close()
}
Expand Down
123 changes: 123 additions & 0 deletions internal/indexer/handshake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2025 Blink Labs Software
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package indexer

import (
"fmt"
"log/slog"
"time"

"github.com/blinklabs-io/cdnsd/internal/config"
"github.com/blinklabs-io/cdnsd/internal/handshake"
)

type handshakeState struct {
peer *handshake.Peer
peerAddress string
peerBackoffDelay time.Duration
blockHeight int
}

func (i *Indexer) startHandshake() error {
cfg := config.GetConfig()
if cfg.Indexer.HandshakeAddress == "" {
return nil
}
i.handshakeState.peerAddress = cfg.Indexer.HandshakeAddress
// Start peer (re)connect loop
go i.handshakeReconnectPeer()
return nil
}

func (i *Indexer) handshakeConnectPeer() error {
slog.Info("connecting to Handshake peer", "address", i.handshakeState.peerAddress)
p, err := handshake.NewPeer(nil, handshake.NetworkMainnet)
if err != nil {
return err
}
i.handshakeState.peer = p
if err := i.handshakeState.peer.Connect(i.handshakeState.peerAddress); err != nil {
return err
}
// Async error handler
go func() {
select {
case err := <-i.handshakeState.peer.ErrorChan():
slog.Error(
"Handshake peer disconnected",
"error",
err,
)
case <-i.handshakeState.peer.DoneChan():
// Stop waiting on connection shutdown
}
}()
// Start sync
if err := i.handshakeState.peer.Sync(nil, i.handshakeHandleSync); err != nil {
_ = i.handshakeState.peer.Close()
return err
}
return nil
}

func (i *Indexer) handshakeReconnectPeer() {
var err error
// Try reconnecting to peer until we are successful
for {
err = i.handshakeConnectPeer()
if err == nil {
// Reset backoff delay
i.handshakeState.peerBackoffDelay = 0
// Wait for connection close
<-i.handshakeState.peer.DoneChan()
continue
}
if i.handshakeState.peerBackoffDelay == 0 {
// Set initial backoff delay
i.handshakeState.peerBackoffDelay = 1 * time.Second
} else {
// Double backoff delay
i.handshakeState.peerBackoffDelay *= 2
}
// Don't delay longer than 2m
if i.handshakeState.peerBackoffDelay > 120*time.Second {
i.handshakeState.peerBackoffDelay = 120 * time.Second
}
slog.Error(
"connection to Handshake peer failed",
"error",
err,
"delay",
i.handshakeState.peerBackoffDelay.String(),
)
time.Sleep(i.handshakeState.peerBackoffDelay)
}
}

func (i *Indexer) handshakeHandleSync(block *handshake.Block) error {
i.handshakeState.blockHeight++
slog.Debug(
"synced Handshake block",
"height", i.handshakeState.blockHeight,
"hash", fmt.Sprintf("%x", block.Hash()),
"prevHash", fmt.Sprintf("%x", block.Header.PrevBlock),
)
// Process transactions
for _, tx := range block.Transactions {
// Process outputs
for _, output := range tx.Outputs {
cov := output.Covenant.Covenant()
switch c := cov.(type) {
case *handshake.RegisterCovenant:
slog.Debug("Handshake domain registration", "resdata", c.ResourceData)
case *handshake.UpdateCovenant:
slog.Debug("Handshake domain update", "resdata", c.ResourceData)
}
}
}
return nil
}
23 changes: 17 additions & 6 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ type Domain struct {
}

type Indexer struct {
pipeline *pipeline.Pipeline
domains map[string]Domain
tipReached bool
syncLogTimer *time.Timer
syncStatus input_chainsync.ChainSyncStatus
watched []watchedAddr
pipeline *pipeline.Pipeline
domains map[string]Domain
tipReached bool
syncLogTimer *time.Timer
syncStatus input_chainsync.ChainSyncStatus
watched []watchedAddr
handshakeState handshakeState
}

type watchedAddr struct {
Expand All @@ -76,6 +77,16 @@ var globalIndexer = &Indexer{
}

func (i *Indexer) Start() error {
if err := i.startCardano(); err != nil {
return err
}
if err := i.startHandshake(); err != nil {
return err
}
return nil
}

func (i *Indexer) startCardano() error {
// Build watched addresses from enabled profiles
cfg := config.GetConfig()
for _, profile := range config.GetProfiles() {
Expand Down