diff --git a/cmd/chaintracks/main.go b/cmd/chaintracks/main.go index 01819208..334e7a3d 100644 --- a/cmd/chaintracks/main.go +++ b/cmd/chaintracks/main.go @@ -49,7 +49,47 @@ func main() { panic(err) } + listenForTipHeaders(server, ctx) + listenForReorgs(server, ctx) + if err := server.ListenAndServe(ctx); err != nil { panic(err) } } + +func listenForTipHeaders(server *chaintracks.Server, ctx context.Context) { + newTipHeadersChan, unsubscribe := server.Service.SubscribeHeaders() + + go func() { + for { + select { + case header := <-newTipHeadersChan: + slog.Default().Info("New tip header received", slog.Uint64("height", uint64(header.Height)), slog.String("hash", header.Hash)) + case <-ctx.Done(): + unsubscribe() + return + } + } + }() +} + +func listenForReorgs(server *chaintracks.Server, ctx context.Context) { + reorgChan, unsubscribe := server.Service.SubscribeReorgs() + + go func() { + for { + select { + case reorg := <-reorgChan: + slog.Default().Info("Reorg detected", + slog.Uint64("new_tip_height", uint64(reorg.NewTip.Height)), + slog.String("new_tip_hash", reorg.NewTip.Hash), + slog.Uint64("old_tip_height", uint64(reorg.OldTip.Height)), + slog.String("old_tip_hash", reorg.OldTip.Hash), + ) + case <-ctx.Done(): + unsubscribe() + return + } + } + }() +} diff --git a/pkg/defs/chaintracks.go b/pkg/defs/chaintracks.go index 8fa83232..0c2e5310 100644 --- a/pkg/defs/chaintracks.go +++ b/pkg/defs/chaintracks.go @@ -25,6 +25,9 @@ type ChaintracksServiceConfig struct { LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"` BulkIngestors []BulkIngestorConfig `mapstructure:"bulk_ingestors"` WocAPIKey string `mapstructure:"woc_api_key"` + + AddLiveRecursionLimit uint `mapstructure:"add_live_recursion_limit"` + LiveHeightThreshold uint `mapstructure:"live_height_threshold"` } // Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type. @@ -52,6 +55,10 @@ func (c *ChaintracksServiceConfig) Validate() error { } } + if c.AddLiveRecursionLimit > 100 || c.AddLiveRecursionLimit > c.LiveHeightThreshold { + return fmt.Errorf("add_live_recursion_limit must be less than or equal to live_height_threshold and not exceed 100") + } + return nil } @@ -73,7 +80,9 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig { Type: WhatsOnChainCDN, }, }, - WocAPIKey: "", + WocAPIKey: "", + AddLiveRecursionLimit: 10, + LiveHeightThreshold: 2000, } } diff --git a/pkg/services/chaintracks/bulk_headers_container.go b/pkg/services/chaintracks/bulk_headers_container.go index 93bbcc3b..b9ce6005 100644 --- a/pkg/services/chaintracks/bulk_headers_container.go +++ b/pkg/services/chaintracks/bulk_headers_container.go @@ -137,7 +137,7 @@ func (b *bulkHeadersContainer) MaxHeightAtChunk(index int) int { func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlockHeader, error) { chunkIndex := b.getIndexForHeight(must.ConvertToIntFromUnsigned(height)) if chunkIndex >= len(b.chunks) { - return nil, nil + return nil, nil // not found } chunk := b.chunks[chunkIndex] @@ -147,7 +147,7 @@ func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlock headerDataEnd := headerDataStart + 80 if headerDataEnd > len(chunk.data) { - return nil, fmt.Errorf("header data end index %d exceeds chunk data length %d", headerDataEnd, len(chunk.data)) + return nil, nil // not found } headerData := chunk.data[headerDataStart:headerDataEnd] diff --git a/pkg/services/chaintracks/bulk_manager.go b/pkg/services/chaintracks/bulk_manager.go index 9af165f3..ffa077da 100644 --- a/pkg/services/chaintracks/bulk_manager.go +++ b/pkg/services/chaintracks/bulk_manager.go @@ -33,7 +33,7 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor, chai } } -func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error { +func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges, liveHeightThreshold uint) error { if presentHeight <= liveHeightThreshold { bm.logger.Info("Skipping bulk synchronization - present height below live height threshold", slog.Any("present_height", presentHeight), slog.Any("live_height_threshold", liveHeightThreshold)) return nil @@ -114,6 +114,30 @@ func (bm *bulkManager) GetFileDataByIndex(fileID int) (*ingest.BulkFileData, err return bm.container.GetFileDataByIndex(fileID) } +func (bm *bulkManager) MigrateFromLiveHeaders(ctx context.Context, liveHeaders []*models.LiveBlockHeader) error { + bm.locker.Lock() + defer bm.locker.Unlock() + + // create data slice + data := make([]byte, 0, len(liveHeaders)*80) + for _, header := range liveHeaders { + headerBytes, err := header.Bytes() + if err != nil { + return fmt.Errorf("failed to convert live header at height %d to bytes: %w", header.Height, err) + } + data = append(data, headerBytes...) + } + + // add to container + heightRange := models.NewHeightRange(liveHeaders[0].Height, liveHeaders[len(liveHeaders)-1].Height) + err := bm.container.Add(ctx, data, heightRange) + if err != nil { + return fmt.Errorf("failed to add live headers to bulk container: %w", err) + } + + return nil +} + func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []ingest.BulkHeaderMinimumInfo, downloader ingest.BulkFileDownloader, maxHeight uint) error { chunksToLoad := bm.getChunksToLoad(bulkChunks) type chunkWithInfo struct { @@ -180,7 +204,7 @@ func (bm *bulkManager) shouldAddNewFile(info *ingest.BulkHeaderMinimumInfo) bool return !rangeToAdd.IsEmpty() } -func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) ([]wdk.ChainBlockHeader, error) { +func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange, addLiveRecursionLimit uint) ([]wdk.ChainBlockHeader, error) { var newLiveHeaders []wdk.ChainBlockHeader maxBulkHeight := bm.GetHeightRange().MaxHeight minLiveHeight := presentHeight diff --git a/pkg/services/chaintracks/chaintracks_handler.go b/pkg/services/chaintracks/chaintracks_handler.go index 687df051..7be97ca9 100644 --- a/pkg/services/chaintracks/chaintracks_handler.go +++ b/pkg/services/chaintracks/chaintracks_handler.go @@ -3,6 +3,7 @@ package chaintracks import ( "bytes" "encoding/json" + "errors" "fmt" "log/slog" "net/http" @@ -13,6 +14,7 @@ import ( "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" servercommon "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/server" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" "github.com/go-softwarelab/common/pkg/to" ) @@ -119,6 +121,11 @@ func (h *Handler) handleFindChainTipHeader(w http.ResponseWriter, r *http.Reques w.Header().Set("Content-Type", "application/json") tipHeader, err := h.service.FindChainTipHeader(r.Context()) + if errors.Is(err, wdk.ErrNotFoundError) { + http.Error(w, "Chain tip not found", http.StatusNotFound) + return + } + if err != nil { h.logger.Error("failed to find chain tip header hex", slog.String("error", err.Error())) http.Error(w, "Internal Server Error", http.StatusInternalServerError) @@ -137,6 +144,11 @@ func (h *Handler) handleFindTipHashHex(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") tipHash, err := h.service.FindChainTipHeader(r.Context()) + if errors.Is(err, wdk.ErrNotFoundError) { + http.Error(w, "Chain tip not found", http.StatusNotFound) + return + } + if err != nil { h.logger.Error("failed to find chain tip hash hex", slog.String("error", err.Error())) http.Error(w, "Internal Server Error", http.StatusInternalServerError) @@ -167,6 +179,11 @@ func (h *Handler) handleFindHeaderHexForHeight(w http.ResponseWriter, r *http.Re } header, err := h.service.FindHeaderForHeight(r.Context(), height) + if errors.Is(err, wdk.ErrNotFoundError) { + http.Error(w, "Header not found for the specified height", http.StatusNotFound) + return + } + if err != nil { h.logger.Error("failed to find header hex for height", slog.String("error", err.Error())) http.Error(w, "Internal Server Error", http.StatusInternalServerError) diff --git a/pkg/services/chaintracks/chaintracks_service.go b/pkg/services/chaintracks/chaintracks_service.go index bf06e9a0..de62c35b 100644 --- a/pkg/services/chaintracks/chaintracks_service.go +++ b/pkg/services/chaintracks/chaintracks_service.go @@ -15,19 +15,16 @@ import ( "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/internal" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" + "github.com/go-softwarelab/common/pkg/must" "github.com/go-softwarelab/common/pkg/slices" "github.com/go-softwarelab/common/pkg/to" ) const ( - // TODO: constants below, can be made configurable if needed - liveHeadersChanSize = 1000 - lastPresentHeightTTL = 60 * time.Second - cdnSyncRepeatDuration = 24 * time.Hour - syncCheckInterval = 1 * time.Second - addLiveRecursionLimit = 11 - halfLiveRecursionLimit = addLiveRecursionLimit / 2 - liveHeightThreshold = 2000 + liveHeadersChanSize = 1000 + lastPresentHeightTTL = 60 * time.Second + cdnSyncRepeatDuration = 24 * time.Hour + syncCheckInterval = 1 * time.Second ) // Service provides core functionality for the Chaintracks service with logging and configuration support. @@ -53,6 +50,9 @@ type Service struct { lastSyncCheck time.Time lastBulkSync time.Time + + headerCallbacks *internal.PubSubEvents[wdk.ChainBlockHeader] + reorgCallbacks *internal.PubSubEvents[models.ReorgEvent] } // NewService creates and returns a new Service instance initialized with the provided logger and configuration. @@ -82,6 +82,8 @@ func NewService(logger *slog.Logger, config defs.ChaintracksServiceConfig, overr liveIngestors: liveIngestors, liveHeadersChan: make(chan wdk.ChainBlockHeader, liveHeadersChanSize), bulkMgr: newBulkManager(logger, bulkIngestors, config.Chain), + headerCallbacks: internal.NewPubSubEvents[wdk.ChainBlockHeader](logger), + reorgCallbacks: internal.NewPubSubEvents[models.ReorgEvent](logger), } srv.cachedPresentHeight = internal.NewCachableWithTTL[uint](lastPresentHeightTTL, srv.fetchLatestPresentHeight) @@ -265,6 +267,20 @@ func (s *Service) BulkFileDataByIndex(index int) (*ingest.BulkFileData, error) { return s.bulkMgr.GetFileDataByIndex(index) } +// SubscribeHeaders subscribes to notifications of new ChainBlockHeader events as they are published by the service. +// It returns a receive-only channel for ChainBlockHeader messages and an unsubscribe function to stop receiving updates. +// Unsubscribing will remove the subscriber and close the associated channel to clean up resources. +func (s *Service) SubscribeHeaders() (readOnlyChan <-chan wdk.ChainBlockHeader, unsubscribe func()) { + return s.headerCallbacks.Subscribe() +} + +// SubscribeReorgs returns a read-only channel for receiving chain reorganization events and an unsubscribe function. +// The channel delivers ReorgEvent values whenever a chain reorg is detected, providing details of the old and new tip. +// Call the returned unsubscribe function to stop receiving events and release resources associated with the subscription. +func (s *Service) SubscribeReorgs() (readOnlyChan <-chan models.ReorgEvent, unsubscribe func()) { + return s.reorgCallbacks.Subscribe() +} + func (s *Service) getMissingBlockHeader(ctx context.Context, hash string) *wdk.ChainBlockHeader { for _, liveIngestor := range s.liveIngestors { header, err := liveIngestor.Ingestor.GetHeaderByHash(ctx, hash) @@ -320,7 +336,7 @@ func (s *Service) syncBulkStorage(ctx context.Context, presentHeight uint, initi } }() - if err := s.bulkMgr.SyncBulkStorage(ctx, presentHeight, initialRanges); err != nil { + if err := s.bulkMgr.SyncBulkStorage(ctx, presentHeight, initialRanges, s.config.LiveHeightThreshold); err != nil { return fmt.Errorf("bulk synchronization failed: %w", err) } @@ -376,6 +392,10 @@ func (s *Service) shiftLiveHeaders(ctx context.Context) error { if err := s.processHeaders(ctx); err != nil { return fmt.Errorf("failed to process live headers during live headers shift: %w", err) } + + if err := s.migrateLiveToBulk(ctx); err != nil { + return fmt.Errorf("failed to migrate live headers to bulk during live headers shift: %w", err) + } return nil } @@ -384,11 +404,11 @@ func (s *Service) skipBulkSync(presentHeight uint, ranges models.HeightRanges) b return false } - return ranges.Live.NotEmpty() && ranges.Live.MaxHeight >= presentHeight-halfLiveRecursionLimit + return ranges.Live.NotEmpty() && ranges.Live.MaxHeight >= presentHeight-(s.config.AddLiveRecursionLimit/2) } func (s *Service) fillGapLiveHeaders(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) error { - gapHeaders, err := s.bulkMgr.GetGapHeadersAsLive(ctx, presentHeight, liveInitialRange) + gapHeaders, err := s.bulkMgr.GetGapHeadersAsLive(ctx, presentHeight, liveInitialRange, s.config.AddLiveRecursionLimit) if err != nil { return fmt.Errorf("failed to get gap headers as live during live headers shift: %w", err) } @@ -398,7 +418,7 @@ func (s *Service) fillGapLiveHeaders(ctx context.Context, presentHeight uint, li } for _, header := range gapHeaders { - if err := s.addLiveHeader(ctx, header); err != nil { + if _, err := s.addLiveHeader(ctx, header); err != nil { s.logger.Warn("Chaintracks service - failed to add gap header as live", slog.String("header_hash", header.Hash), slog.String("error", err.Error())) } } @@ -407,89 +427,102 @@ func (s *Service) fillGapLiveHeaders(ctx context.Context, presentHeight uint, li } func (s *Service) processHeaders(ctx context.Context) error { + addedHeadersHashes := make(map[string]struct{}) for { select { case <-ctx.Done(): return nil case header := <-s.liveHeadersChan: - if err := s.addLiveHeader(ctx, header); err != nil { + if added, err := s.addLiveHeader(ctx, header); err != nil { s.logger.Warn("Chaintracks service - failed to add live header", slog.String("header_hash", header.Hash), slog.String("error", err.Error())) + } else if added { + addedHeadersHashes[header.Hash] = struct{}{} } default: // No more headers to process + + // Publish chain tip if it was added during this processing cycle + if activeTip, err := s.storage.Query(ctx).GetActiveTipLiveHeader(); err != nil { + return fmt.Errorf("failed to get active tip live header after processing headers: %w", err) + } else if activeTip != nil { + if _, ok := addedHeadersHashes[activeTip.Hash]; ok { + s.headerCallbacks.Publish(activeTip.ChainBlockHeader) + } + } + + // invalidate cached present height to force refresh on next request + s.cachedPresentHeight.Invalidate() + return nil } } } -func (s *Service) addLiveHeader(ctx context.Context, header wdk.ChainBlockHeader) error { - err := s.storeLiveHeader(ctx, header) +func (s *Service) addLiveHeader(ctx context.Context, header wdk.ChainBlockHeader) (added bool, err error) { + added, err = s.storeLiveHeader(ctx, header) if errors.Is(err, errNoPrev) { - if err := s.addLiveHeaderRecursive(ctx, header, addLiveRecursionLimit); err != nil { - return fmt.Errorf("failed to add header recursively: %w", err) - } + added, err = s.addLiveHeaderRecursive(ctx, header, must.ConvertToIntFromUnsigned(s.config.AddLiveRecursionLimit)) } if err != nil { - return fmt.Errorf("failed to store header: %w", err) + return added, fmt.Errorf("failed to store header: %w", err) } - return nil + return added, nil } -func (s *Service) addLiveHeaderRecursive(ctx context.Context, header wdk.ChainBlockHeader, depth int) error { +func (s *Service) addLiveHeaderRecursive(ctx context.Context, header wdk.ChainBlockHeader, depth int) (bool, error) { if depth <= 0 { - return fmt.Errorf("recursion limit reached while adding header recursively for header: %s", header.Hash) + return false, fmt.Errorf("recursion limit reached while adding header recursively for header: %s", header.Hash) } prevHeader := s.getMissingBlockHeader(ctx, header.PreviousHash) if prevHeader == nil { - return fmt.Errorf("previous header not found for hash: %s", header.PreviousHash) + return false, fmt.Errorf("previous header not found for hash: %s", header.PreviousHash) } - err := s.storeLiveHeader(ctx, *prevHeader) + _, err := s.storeLiveHeader(ctx, *prevHeader) if errors.Is(err, errNoPrev) { - if err := s.addLiveHeaderRecursive(ctx, *prevHeader, depth-1); err != nil { - return fmt.Errorf("failed to add previous header recursively: %w", err) - } + _, err = s.addLiveHeaderRecursive(ctx, *prevHeader, depth-1) } if err != nil { - return fmt.Errorf("failed to store previous header: %w", err) + return false, fmt.Errorf("failed to store previous header: %w", err) } - if err := s.storeLiveHeader(ctx, header); err != nil { - return fmt.Errorf("failed to store header after adding previous: %w", err) + added, err := s.storeLiveHeader(ctx, header) + if err != nil { + return false, fmt.Errorf("failed to store header after adding previous: %w", err) } - return nil + return added, nil } var errNoPrev = fmt.Errorf("no previous header found") -func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHeader) (err error) { +func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHeader) (added bool, err error) { if header.Height == 0 { - return fmt.Errorf("handling genesis block header is not supported here") + return false, fmt.Errorf("handling genesis block header is not supported here") } if err := header.Validate(); err != nil { - return fmt.Errorf("invalid block header: %w", err) + return false, fmt.Errorf("invalid block header: %w", err) } if IsDirtyHash(header.Hash) { - return fmt.Errorf("cannot add block header with dirty hash: %s", header.Hash) + return false, fmt.Errorf("cannot add block header with dirty hash: %s", header.Hash) } lastBulk, lastBulkChainWork, err := s.bulkMgr.LastHeader() if err != nil { - return fmt.Errorf("failed to get last bulk header: %w", err) + return false, fmt.Errorf("failed to get last bulk header: %w", err) } if lastBulk == nil { - return fmt.Errorf("no bulk headers available to validate against") + return false, fmt.Errorf("no bulk headers available to validate against") } if header.Height <= lastBulk.Height { s.logger.Info("Chaintracks service - skipping storage of live header already present in bulk storage", slog.String("header_hash", header.Hash), slog.Any("header_height", header.Height)) - return nil + return false, nil } q := s.storage.Query(ctx) @@ -509,20 +542,20 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead }() if exists, err := q.LiveHeaderExists(header.Hash); err != nil { - return fmt.Errorf("failed to check existing header: %w", err) + return false, fmt.Errorf("failed to check existing header: %w", err) } else if exists { s.logger.Debug("Chaintracks service - header already exists, skipping", slog.String("header_hash", header.Hash)) - return nil + return false, nil } oneBack, err := q.GetLiveHeaderByHash(header.PreviousHash) if err != nil { - return fmt.Errorf("failed to get previous header: %w", err) + return false, fmt.Errorf("failed to get previous header: %w", err) } if oneBack == nil { if count, err := q.CountLiveHeaders(); err != nil { - return fmt.Errorf("failed to count live headers: %w", err) + return false, fmt.Errorf("failed to count live headers: %w", err) } else if count == 0 { // No live headers yet, check if this header connects directly to last bulk if header.PreviousHash == lastBulk.Hash && header.Height == lastBulk.Height+1 { @@ -536,18 +569,18 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead IsChainTip: true, IsActive: true, }); err != nil { - return fmt.Errorf("failed to insert new live header: %w", err) + return false, fmt.Errorf("failed to insert new live header: %w", err) } - return nil + return true, nil } } - return errNoPrev + return false, errNoPrev } if oneBack.Height+1 != header.Height { - return fmt.Errorf("header height mismatch: expected %d, got %d", oneBack.Height+1, header.Height) + return false, fmt.Errorf("header height mismatch: expected %d, got %d", oneBack.Height+1, header.Height) } var priorTip *models.LiveBlockHeader @@ -556,17 +589,17 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead } else { priorTip, err = q.GetActiveTipLiveHeader() if err != nil { - return fmt.Errorf("failed to get active tip header: %w", err) + return false, fmt.Errorf("failed to get active tip header: %w", err) } if priorTip == nil { - return fmt.Errorf("active tip header not found for hash: %s", oneBack.Hash) + return false, fmt.Errorf("active tip header not found for hash: %s", oneBack.Hash) } } oneBackChainWork, err := internal.ChainWorkFromHex(oneBack.ChainWork) if err != nil { - return fmt.Errorf("failed to parse chain work from previous header: %w", err) + return false, fmt.Errorf("failed to parse chain work from previous header: %w", err) } headerChainWork := internal.ChainWorkFromBits(header.Bits) @@ -575,7 +608,7 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead priorTipChainWork, err := internal.ChainWorkFromHex(priorTip.ChainWork) if err != nil { - return fmt.Errorf("failed to parse chain work from prior tip header: %w", err) + return false, fmt.Errorf("failed to parse chain work from prior tip header: %w", err) } isActiveTip := chainWork.CmpChainWork(priorTipChainWork) > 0 @@ -588,36 +621,38 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead previousHash := activeAncestor.PreviousHash activeAncestor, err = q.GetLiveHeaderByHash(previousHash) if err != nil { - return fmt.Errorf("failed to get active ancestor header: %w", err) + return false, fmt.Errorf("failed to get active ancestor header: %w", err) } if activeAncestor == nil { - return fmt.Errorf("active ancestor header not found for hash: %s", previousHash) + return false, fmt.Errorf("active ancestor header not found for hash: %s", previousHash) } } - // TODO: Calculate reorg depth - but this is not needed yet - used only by reorg callbacks - if activeAncestor.HeaderID != oneBack.HeaderID { s.logger.Info("Chaintracks service - chain reorganization detected", slog.String("new_tip_hash", header.Hash), slog.Any("new_tip_height", header.Height), slog.String("active_ancestor_hash", activeAncestor.Hash), slog.Any("active_ancestor_height", activeAncestor.Height)) // deactivate headers from the current active chain tip up to but excluding our activeAncestor: if err := s.setActiveRecursivelyUntilReachAncestor(q, false, priorTip, activeAncestor); err != nil { - return fmt.Errorf("failed to deactivate headers during reorg: %w", err) + return false, fmt.Errorf("failed to deactivate headers during reorg: %w", err) } // the first header to activate is one before the one we are about to insert // headers are activated until we reach the active ancestor if err := s.setActiveRecursivelyUntilReachAncestor(q, true, oneBack, activeAncestor); err != nil { - return fmt.Errorf("failed to activate headers during reorg: %w", err) + return false, fmt.Errorf("failed to activate headers during reorg: %w", err) } + s.reorgCallbacks.Publish(models.ReorgEvent{ + NewTip: header, + OldTip: priorTip.ChainBlockHeader, + }) } } if oneBack.IsChainTip { if err := q.SetChainTipByID(oneBack.HeaderID, false); err != nil { - return fmt.Errorf("failed to unset prior chain tip: %w", err) + return false, fmt.Errorf("failed to unset prior chain tip: %w", err) } } @@ -628,12 +663,67 @@ func (s *Service) storeLiveHeader(ctx context.Context, header wdk.ChainBlockHead IsChainTip: isActiveTip, IsActive: isActiveTip, }); err != nil { - return fmt.Errorf("failed to insert new live header: %w", err) + return false, fmt.Errorf("failed to insert new live header: %w", err) + } + + return true, nil +} + +func (s *Service) migrateLiveToBulk(ctx context.Context) (err error) { + q := s.storage.Query(ctx) + q.Begin() + defer func() { + if err != nil { + rollbackErr := q.Rollback() + if rollbackErr != nil { + err = fmt.Errorf("failed to rollback transaction after error: %s; original error: %w", rollbackErr.Error(), err) + } + } else { + err = q.Commit() + if err != nil { + err = fmt.Errorf("failed to commit transaction: %w", err) + } + } + }() + + liveRange, err := q.FindLiveHeightRange() + if err != nil { + return fmt.Errorf("failed to find live height range during migration: %w", err) + } + + if liveRange.IsEmpty() { + s.logger.Debug("No live headers to migrate to bulk storage") + return nil } - // TODO: Prune live block headers + count := liveRange.MaxHeight - liveRange.MinHeight + 1 + if count <= s.config.LiveHeightThreshold { + s.logger.Debug("Not enough live headers to migrate to bulk storage", slog.Any("live_header_count", count)) + return nil + } + thresholdHeight := liveRange.MaxHeight - s.config.LiveHeightThreshold + + headers, err := q.FindHeadersForHeightLessThanOrEqualSorted(thresholdHeight, must.ConvertToIntFromUnsigned(s.config.LiveHeightThreshold)) + if err != nil { + return fmt.Errorf("failed to find live headers for migration: %w", err) + } + + if len(headers) == 0 { + s.logger.Debug("No live headers found for migration to bulk storage") + return nil + } + + err = s.bulkMgr.MigrateFromLiveHeaders(ctx, headers) + if err != nil { + return fmt.Errorf("failed to migrate live headers to bulk storage: %w", err) + } + + err = q.DeleteLiveHeadersByIDs(slices.Map(headers, func(h *models.LiveBlockHeader) uint { return h.HeaderID })) + if err != nil { + return fmt.Errorf("failed to delete migrated live headers: %w", err) + } - // TODO: trigger callbacks when implemented + s.logger.Info("Migrated live headers to bulk storage", slog.Any("migrated_header_count", len(headers)), slog.Any("up_to_height", thresholdHeight)) return nil } diff --git a/pkg/services/chaintracks/gormstorage/storage_queries.go b/pkg/services/chaintracks/gormstorage/storage_queries.go index 4d1e9591..9e268b0c 100644 --- a/pkg/services/chaintracks/gormstorage/storage_queries.go +++ b/pkg/services/chaintracks/gormstorage/storage_queries.go @@ -124,6 +124,36 @@ func (i *storageQueries) SetActiveByID(id uint, isActive bool) error { return nil } +func (i *storageQueries) FindHeadersForHeightLessThanOrEqualSorted(height uint, limit int) ([]*models.LiveBlockHeader, error) { + table := i.getQuery().ChaintracksLiveHeader + modelsList, err := table. + Where(table.Height.Lte(height)). + Order(table.Height.Asc()). + Limit(limit). + Find() + if err != nil { + return nil, fmt.Errorf("failed to find headers for height less than or equal: %w", err) + } + + result := make([]*models.LiveBlockHeader, 0, len(modelsList)) + for _, model := range modelsList { + result = append(result, mapLiveHeader(model)) + } + + return result, nil +} + +func (i *storageQueries) DeleteLiveHeadersByIDs(ids []uint) error { + table := i.getQuery().ChaintracksLiveHeader + _, err := table. + Where(table.HeaderID.In(ids...)). + Delete() + if err != nil { + return fmt.Errorf("failed to delete live headers by IDs: %w", err) + } + return nil +} + func (i *storageQueries) InsertNewLiveHeader(header *models.LiveBlockHeader) error { table := i.getQuery().ChaintracksLiveHeader err := table.Create(&dbmodels.ChaintracksLiveHeader{ diff --git a/pkg/services/chaintracks/internal/cacheable_with_ttl.go b/pkg/services/chaintracks/internal/cacheable_with_ttl.go index d7bea0a6..2c964536 100644 --- a/pkg/services/chaintracks/internal/cacheable_with_ttl.go +++ b/pkg/services/chaintracks/internal/cacheable_with_ttl.go @@ -52,6 +52,13 @@ func (c *CacheableWithTTL[T]) Get(ctx context.Context) (T, error) { return *c.value, nil } +func (c *CacheableWithTTL[T]) Invalidate() { + c.locker.Lock() + defer c.locker.Unlock() + c.value = nil + c.lastSet = nil +} + func (c *CacheableWithTTL[T]) readOnValid() *T { c.locker.RLock() defer c.locker.RUnlock() diff --git a/pkg/services/chaintracks/internal/pub_sub_events.go b/pkg/services/chaintracks/internal/pub_sub_events.go new file mode 100644 index 00000000..f5f919e2 --- /dev/null +++ b/pkg/services/chaintracks/internal/pub_sub_events.go @@ -0,0 +1,53 @@ +package internal + +import ( + "log/slog" + "sync" + + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" +) + +type PubSubEvents[T any] struct { + logger *slog.Logger + subscribers map[chan T]struct{} + mutex sync.RWMutex +} + +func NewPubSubEvents[T any](logger *slog.Logger) *PubSubEvents[T] { + return &PubSubEvents[T]{ + logger: logging.Child(logger, "pub_sub_events"), + subscribers: make(map[chan T]struct{}), + } +} + +func (s *PubSubEvents[T]) Subscribe() (readOnlyChan <-chan T, unsubscribe func()) { + s.mutex.Lock() + defer s.mutex.Unlock() + + // Create a buffered channel to prevent immediate blocking + ch := make(chan T, 10) + s.subscribers[ch] = struct{}{} + + readOnlyChan = ch + unsubscribe = func() { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.subscribers, ch) + close(ch) + } + return +} + +func (s *PubSubEvents[T]) Publish(event T) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + for ch := range s.subscribers { + // Non-blocking send to prevent blocking if the channel is full + select { + case ch <- event: + default: + } + } +} diff --git a/pkg/services/chaintracks/models/reorg_event.go b/pkg/services/chaintracks/models/reorg_event.go new file mode 100644 index 00000000..f89966fd --- /dev/null +++ b/pkg/services/chaintracks/models/reorg_event.go @@ -0,0 +1,9 @@ +package models + +import "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" + +// ReorgEvent represents a chain reorganization event involving a change from one chain tip to another. +type ReorgEvent struct { + OldTip wdk.ChainBlockHeader + NewTip wdk.ChainBlockHeader +} diff --git a/pkg/services/chaintracks/models/storage_queries.go b/pkg/services/chaintracks/models/storage_queries.go index 809944f1..d71a53f3 100644 --- a/pkg/services/chaintracks/models/storage_queries.go +++ b/pkg/services/chaintracks/models/storage_queries.go @@ -17,4 +17,6 @@ type StorageQueries interface { CountLiveHeaders() (int64, error) GetLiveHeaderByHeight(height uint) (*LiveBlockHeader, error) FindLiveHeightRange() (HeightRange, error) + FindHeadersForHeightLessThanOrEqualSorted(height uint, limit int) ([]*LiveBlockHeader, error) + DeleteLiveHeadersByIDs(ids []uint) error }