diff --git a/internal/config/config.go b/internal/config/config.go index 35b6448..0d9b3e4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -50,13 +50,14 @@ type MetricsConfig struct { } type IndexerConfig struct { - Network string `yaml:"network" envconfig:"INDEXER_NETWORK"` - NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"` - Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"` - SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"` - InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"` - InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"` - Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"` + Network string `yaml:"network" envconfig:"INDEXER_NETWORK"` + NetworkMagic uint32 `yaml:"networkMagic" envconfig:"INDEXER_NETWORK_MAGIC"` + Address string `yaml:"address" envconfig:"INDEXER_TCP_ADDRESS"` + SocketPath string `yaml:"socketPath" envconfig:"INDEXER_SOCKET_PATH"` + InterceptHash string `yaml:"interceptHash" envconfig:"INDEXER_INTERCEPT_HASH"` + InterceptSlot uint64 `yaml:"interceptSlot" envconfig:"INDEXER_INTERCEPT_SLOT"` + Verify bool `yaml:"verify" envconfig:"INDEXER_VERIFY"` + HandshakeAddress string `yaml:"handshakeAddress" envconfig:"INDEXER_HANDSHAKE_ADDRESS"` } type StateConfig struct { diff --git a/internal/handshake/covenant.go b/internal/handshake/covenant.go index 4b990df..3f8a8d5 100644 --- a/internal/handshake/covenant.go +++ b/internal/handshake/covenant.go @@ -9,6 +9,7 @@ package handshake import ( "encoding/binary" "errors" + "fmt" "io" ) @@ -66,13 +67,13 @@ func (c *GenericCovenant) Covenant() Covenant { case CovenantTypeRegister: ret, err := NewRegisterCovenantFromGeneric(c) if err != nil { - panic("can't convert generic covenant to Register") + panic(fmt.Sprintf("can't convert generic covenant to Register: %s", err)) } return ret case CovenantTypeUpdate: ret, err := NewUpdateCovenantFromGeneric(c) if err != nil { - panic("can't convert generic covenant to Update") + panic(fmt.Sprintf("can't convert generic covenant to Update: %s", err)) } return ret } @@ -120,7 +121,6 @@ type UpdateCovenant struct { NameHash []byte Height uint32 ResourceData DomainResourceData - BlockHash []byte } func (UpdateCovenant) isCovenant() {} @@ -131,16 +131,14 @@ func NewUpdateCovenantFromGeneric( if gc.Type != CovenantTypeUpdate { return nil, errors.New("wrong covenant type") } - if len(gc.Items) != 4 { + if len(gc.Items) != 3 { return nil, errors.New("incorrect items length") } ret := &UpdateCovenant{ - NameHash: make([]byte, len(gc.Items[0])), - BlockHash: make([]byte, len(gc.Items[3])), + NameHash: make([]byte, len(gc.Items[0])), } // Copy hashes copy(ret.NameHash, gc.Items[0]) - copy(ret.BlockHash, gc.Items[3]) // Decode height from bytes ret.Height = binary.LittleEndian.Uint32(gc.Items[1]) // Decode resource data diff --git a/internal/handshake/covenant_test.go b/internal/handshake/covenant_test.go index 8218b3f..6e3795b 100644 --- a/internal/handshake/covenant_test.go +++ b/internal/handshake/covenant_test.go @@ -113,9 +113,6 @@ func TestCovenantUpdateFromGeneric(t *testing.T) { decodeHex( "0002036e73310a69727677696c6c69616d002ce706b701c00202036e7332c00636d688f601c01a00d5580d0114402ed0125506f35ba249265f39b988d7028a28c300d5580d02200c6c45064c26b529b4ac074dff5de60a99d6025d5b0d7f32c2b8c7d40ec8b3de00d5580d043071cb0417852b08b965413f3b871b033996159d121a585e35111a335d4cfb79b67e49a99c3829f6a1f42e100f7f33d7d9", ), - decodeHex( - "0000000000000000153c62dbcabb762c254fb4104ab7cdd779926b79b34601fc", - ), }, } expectedCovenant := &handshake.UpdateCovenant{ @@ -166,9 +163,6 @@ func TestCovenantUpdateFromGeneric(t *testing.T) { }, }, }, - BlockHash: decodeHex( - "0000000000000000153c62dbcabb762c254fb4104ab7cdd779926b79b34601fc", - ), } tmpCovenant, err := handshake.NewUpdateCovenantFromGeneric( testGenericCovenant, diff --git a/internal/handshake/peer.go b/internal/handshake/peer.go index ad6f598..1f06527 100644 --- a/internal/handshake/peer.go +++ b/internal/handshake/peer.go @@ -99,6 +99,7 @@ func (p *Peer) Close() error { return err } p.conn = nil + // Close done channel to signify shutdown close(p.doneCh) return nil } @@ -108,6 +109,11 @@ func (p *Peer) ErrorChan() <-chan error { return p.errorCh } +// DoneChan returns the shutdown channel +func (p *Peer) DoneChan() <-chan struct{} { + return p.doneCh +} + // setupConnection runs the initial handshake and starts the receive loop func (p *Peer) setupConnection() error { // Init channels for async messages @@ -187,6 +193,12 @@ func (p *Peer) recvLoop() { } }() if err != nil { + // Don't return an async error if we're already shutting down + select { + case <-p.doneCh: + return + default: + } p.errorCh <- err _ = p.Close() } @@ -473,6 +485,12 @@ func (p *Peer) Sync(locator [][32]byte, syncFunc SyncFunc) error { } }() if err != nil { + // Don't return an async error if we're already shutting down + select { + case <-p.doneCh: + return + default: + } p.errorCh <- err _ = p.Close() } diff --git a/internal/indexer/handshake.go b/internal/indexer/handshake.go new file mode 100644 index 0000000..d072587 --- /dev/null +++ b/internal/indexer/handshake.go @@ -0,0 +1,123 @@ +// Copyright 2025 Blink Labs Software +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package indexer + +import ( + "fmt" + "log/slog" + "time" + + "github.com/blinklabs-io/cdnsd/internal/config" + "github.com/blinklabs-io/cdnsd/internal/handshake" +) + +type handshakeState struct { + peer *handshake.Peer + peerAddress string + peerBackoffDelay time.Duration + blockHeight int +} + +func (i *Indexer) startHandshake() error { + cfg := config.GetConfig() + if cfg.Indexer.HandshakeAddress == "" { + return nil + } + i.handshakeState.peerAddress = cfg.Indexer.HandshakeAddress + // Start peer (re)connect loop + go i.handshakeReconnectPeer() + return nil +} + +func (i *Indexer) handshakeConnectPeer() error { + slog.Info("connecting to Handshake peer", "address", i.handshakeState.peerAddress) + p, err := handshake.NewPeer(nil, handshake.NetworkMainnet) + if err != nil { + return err + } + i.handshakeState.peer = p + if err := i.handshakeState.peer.Connect(i.handshakeState.peerAddress); err != nil { + return err + } + // Async error handler + go func() { + select { + case err := <-i.handshakeState.peer.ErrorChan(): + slog.Error( + "Handshake peer disconnected", + "error", + err, + ) + case <-i.handshakeState.peer.DoneChan(): + // Stop waiting on connection shutdown + } + }() + // Start sync + if err := i.handshakeState.peer.Sync(nil, i.handshakeHandleSync); err != nil { + _ = i.handshakeState.peer.Close() + return err + } + return nil +} + +func (i *Indexer) handshakeReconnectPeer() { + var err error + // Try reconnecting to peer until we are successful + for { + err = i.handshakeConnectPeer() + if err == nil { + // Reset backoff delay + i.handshakeState.peerBackoffDelay = 0 + // Wait for connection close + <-i.handshakeState.peer.DoneChan() + continue + } + if i.handshakeState.peerBackoffDelay == 0 { + // Set initial backoff delay + i.handshakeState.peerBackoffDelay = 1 * time.Second + } else { + // Double backoff delay + i.handshakeState.peerBackoffDelay *= 2 + } + // Don't delay longer than 2m + if i.handshakeState.peerBackoffDelay > 120*time.Second { + i.handshakeState.peerBackoffDelay = 120 * time.Second + } + slog.Error( + "connection to Handshake peer failed", + "error", + err, + "delay", + i.handshakeState.peerBackoffDelay.String(), + ) + time.Sleep(i.handshakeState.peerBackoffDelay) + } +} + +func (i *Indexer) handshakeHandleSync(block *handshake.Block) error { + i.handshakeState.blockHeight++ + slog.Debug( + "synced Handshake block", + "height", i.handshakeState.blockHeight, + "hash", fmt.Sprintf("%x", block.Hash()), + "prevHash", fmt.Sprintf("%x", block.Header.PrevBlock), + ) + // Process transactions + for _, tx := range block.Transactions { + // Process outputs + for _, output := range tx.Outputs { + cov := output.Covenant.Covenant() + switch c := cov.(type) { + case *handshake.RegisterCovenant: + slog.Debug("Handshake domain registration", "resdata", c.ResourceData) + case *handshake.UpdateCovenant: + slog.Debug("Handshake domain update", "resdata", c.ResourceData) + } + } + } + return nil +} diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 7f1675f..7743233 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -55,12 +55,13 @@ type Domain struct { } type Indexer struct { - pipeline *pipeline.Pipeline - domains map[string]Domain - tipReached bool - syncLogTimer *time.Timer - syncStatus input_chainsync.ChainSyncStatus - watched []watchedAddr + pipeline *pipeline.Pipeline + domains map[string]Domain + tipReached bool + syncLogTimer *time.Timer + syncStatus input_chainsync.ChainSyncStatus + watched []watchedAddr + handshakeState handshakeState } type watchedAddr struct { @@ -76,6 +77,16 @@ var globalIndexer = &Indexer{ } func (i *Indexer) Start() error { + if err := i.startCardano(); err != nil { + return err + } + if err := i.startHandshake(); err != nil { + return err + } + return nil +} + +func (i *Indexer) startCardano() error { // Build watched addresses from enabled profiles cfg := config.GetConfig() for _, profile := range config.GetProfiles() {