Skip to content
40 changes: 40 additions & 0 deletions cmd/chaintracks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,47 @@ func main() {
panic(err)
}

listenForTipHeaders(server, ctx)
listenForReorgs(server, ctx)

if err := server.ListenAndServe(ctx); err != nil {
panic(err)
}
}

func listenForTipHeaders(server *chaintracks.Server, ctx context.Context) {
newTipHeadersChan, unsubscribe := server.Service.SubscribeHeaders()

go func() {
for {
select {
case header := <-newTipHeadersChan:
slog.Default().Info("New tip header received", slog.Uint64("height", uint64(header.Height)), slog.String("hash", header.Hash))
case <-ctx.Done():
unsubscribe()
return
}
}
}()
}

func listenForReorgs(server *chaintracks.Server, ctx context.Context) {
reorgChan, unsubscribe := server.Service.SubscribeReorgs()

go func() {
for {
select {
case reorg := <-reorgChan:
slog.Default().Info("Reorg detected",
slog.Uint64("new_tip_height", uint64(reorg.NewTip.Height)),
slog.String("new_tip_hash", reorg.NewTip.Hash),
slog.Uint64("old_tip_height", uint64(reorg.OldTip.Height)),
slog.String("old_tip_hash", reorg.OldTip.Hash),
)
case <-ctx.Done():
unsubscribe()
return
}
}
}()
}
11 changes: 10 additions & 1 deletion pkg/defs/chaintracks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type ChaintracksServiceConfig struct {
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
BulkIngestors []BulkIngestorConfig `mapstructure:"bulk_ingestors"`
WocAPIKey string `mapstructure:"woc_api_key"`

AddLiveRecursionLimit uint `mapstructure:"add_live_recursion_limit"`
LiveHeightThreshold uint `mapstructure:"live_height_threshold"`
}

// Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type.
Expand Down Expand Up @@ -52,6 +55,10 @@ func (c *ChaintracksServiceConfig) Validate() error {
}
}

if c.AddLiveRecursionLimit > 100 || c.AddLiveRecursionLimit > c.LiveHeightThreshold {
return fmt.Errorf("add_live_recursion_limit must be less than or equal to live_height_threshold and not exceed 100")
}

return nil
}

Expand All @@ -73,7 +80,9 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig {
Type: WhatsOnChainCDN,
},
},
WocAPIKey: "",
WocAPIKey: "",
AddLiveRecursionLimit: 10,
LiveHeightThreshold: 2000,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/services/chaintracks/bulk_headers_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (b *bulkHeadersContainer) MaxHeightAtChunk(index int) int {
func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlockHeader, error) {
chunkIndex := b.getIndexForHeight(must.ConvertToIntFromUnsigned(height))
if chunkIndex >= len(b.chunks) {
return nil, nil
return nil, nil // not found
}

chunk := b.chunks[chunkIndex]
Expand All @@ -147,7 +147,7 @@ func (b *bulkHeadersContainer) FindHeaderForHeight(height uint) (*wdk.ChainBlock
headerDataEnd := headerDataStart + 80

if headerDataEnd > len(chunk.data) {
return nil, fmt.Errorf("header data end index %d exceeds chunk data length %d", headerDataEnd, len(chunk.data))
return nil, nil // not found
}

headerData := chunk.data[headerDataStart:headerDataEnd]
Expand Down
28 changes: 26 additions & 2 deletions pkg/services/chaintracks/bulk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
}
}

func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error {
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges, liveHeightThreshold uint) error {
if presentHeight <= liveHeightThreshold {
bm.logger.Info("Skipping bulk synchronization - present height below live height threshold", slog.Any("present_height", presentHeight), slog.Any("live_height_threshold", liveHeightThreshold))
return nil
Expand Down Expand Up @@ -114,6 +114,30 @@
return bm.container.GetFileDataByIndex(fileID)
}

func (bm *bulkManager) MigrateFromLiveHeaders(ctx context.Context, liveHeaders []*models.LiveBlockHeader) error {
bm.locker.Lock()
defer bm.locker.Unlock()

// create data slice
data := make([]byte, 0, len(liveHeaders)*80)
for _, header := range liveHeaders {
headerBytes, err := header.Bytes()
if err != nil {
return fmt.Errorf("failed to convert live header at height %d to bytes: %w", header.Height, err)
}
data = append(data, headerBytes...)
}

// add to container
heightRange := models.NewHeightRange(liveHeaders[0].Height, liveHeaders[len(liveHeaders)-1].Height)
err := bm.container.Add(ctx, data, heightRange)
if err != nil {
return fmt.Errorf("failed to add live headers to bulk container: %w", err)
}

return nil
}

func (bm *bulkManager) processBulkChunks(ctx context.Context, bulkChunks []ingest.BulkHeaderMinimumInfo, downloader ingest.BulkFileDownloader, maxHeight uint) error {
chunksToLoad := bm.getChunksToLoad(bulkChunks)
type chunkWithInfo struct {
Expand Down Expand Up @@ -180,7 +204,7 @@
return !rangeToAdd.IsEmpty()
}

func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange) ([]wdk.ChainBlockHeader, error) {
func (bm *bulkManager) GetGapHeadersAsLive(ctx context.Context, presentHeight uint, liveInitialRange models.HeightRange, addLiveRecursionLimit uint) ([]wdk.ChainBlockHeader, error) {

Check failure on line 207 in pkg/services/chaintracks/bulk_manager.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 33 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=bsv-blockchain_go-wallet-toolbox&issues=AZrE-A_F9rBd7WQFZu3k&open=AZrE-A_F9rBd7WQFZu3k&pullRequest=699

Check warning on line 207 in pkg/services/chaintracks/bulk_manager.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=bsv-blockchain_go-wallet-toolbox&issues=AZrE-A_F9rBd7WQFZu3j&open=AZrE-A_F9rBd7WQFZu3j&pullRequest=699
var newLiveHeaders []wdk.ChainBlockHeader
maxBulkHeight := bm.GetHeightRange().MaxHeight
minLiveHeight := presentHeight
Expand Down
17 changes: 17 additions & 0 deletions pkg/services/chaintracks/chaintracks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chaintracks
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
servercommon "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/server"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk"
"github.com/go-softwarelab/common/pkg/to"
)

Expand Down Expand Up @@ -119,6 +121,11 @@ func (h *Handler) handleFindChainTipHeader(w http.ResponseWriter, r *http.Reques
w.Header().Set("Content-Type", "application/json")

tipHeader, err := h.service.FindChainTipHeader(r.Context())
if errors.Is(err, wdk.ErrNotFoundError) {
http.Error(w, "Chain tip not found", http.StatusNotFound)
return
}

if err != nil {
h.logger.Error("failed to find chain tip header hex", slog.String("error", err.Error()))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand All @@ -137,6 +144,11 @@ func (h *Handler) handleFindTipHashHex(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

tipHash, err := h.service.FindChainTipHeader(r.Context())
if errors.Is(err, wdk.ErrNotFoundError) {
http.Error(w, "Chain tip not found", http.StatusNotFound)
return
}

if err != nil {
h.logger.Error("failed to find chain tip hash hex", slog.String("error", err.Error()))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand Down Expand Up @@ -167,6 +179,11 @@ func (h *Handler) handleFindHeaderHexForHeight(w http.ResponseWriter, r *http.Re
}

header, err := h.service.FindHeaderForHeight(r.Context(), height)
if errors.Is(err, wdk.ErrNotFoundError) {
http.Error(w, "Header not found for the specified height", http.StatusNotFound)
return
}

if err != nil {
h.logger.Error("failed to find header hex for height", slog.String("error", err.Error()))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand Down
Loading
Loading