Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/defs/chaintracks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ type ChaintracksServiceConfig struct {
Chain BSVNetwork `mapstructure:"-"`
LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"`
CDNBulkIngestors []CDNBulkIngestorConfig `mapstructure:"cdn_bulk_ingestors"`

// TODO: Specify API key for WoC ingestor
WocAPIKey string `mapstructure:"woc_api_key"`
}

// Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type.
Expand Down Expand Up @@ -68,6 +67,7 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig {
SourceURL: BabbageBlockHeadersCDN,
},
},
WocAPIKey: "",
}
}

Expand Down
22 changes: 0 additions & 22 deletions pkg/defs/woc_poll_ingestor.go

This file was deleted.

22 changes: 19 additions & 3 deletions pkg/services/chaintracks/bulk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor) *bul
}
}

func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) (err error) {
func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error {
bm.logger.Info("Starting bulk synchronization", slog.Any("present_height", presentHeight), slog.Any("initial_ranges", initialRanges))

missingRange := models.NewHeightRange(0, presentHeight)
for _, ingestor := range bm.bulkIngestors {
bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, initialRanges)
if missingRange.IsEmpty() {
break
}

bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, missingRange)
if err != nil {
bm.logger.Error("Chaintracks service - error during bulk synchronization", slog.String("ingestor_name", ingestor.Name), slog.String("error", err.Error()))
return fmt.Errorf("bulk synchronization failed for ingestor %s: %w", ingestor.Name, err)
Expand All @@ -46,7 +51,18 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint,
return fmt.Errorf("failed to process bulk chunks from ingestor %s: %w", ingestor.Name, err)
}

// TODO: Implement DONE check and break if done
providedRange := models.NewEmptyHeightRange()
for _, chunk := range bulkChunks {
providedRange, err = providedRange.Union(chunk.ToHeightRange())
if err != nil {
return fmt.Errorf("failed to compute provided height range from ingestor %s: %w", ingestor.Name, err)
}
}

missingRange, err = missingRange.Subtract(providedRange)
if err != nil {
return fmt.Errorf("failed to compute missing height range after ingestor %s: %w", ingestor.Name, err)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/chaintracks/chaintracks_initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Initializers struct {
func DefaultInitializers() Initializers {
return Initializers{
WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) LiveIngestor {
return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain})
return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithAPIKey(config.WocAPIKey))
},
CDNBulkIngestorFactory: func(logger *slog.Logger, chain defs.BSVNetwork, config defs.CDNBulkIngestorConfig) BulkIngestor {
return ingest.NewBulkIngestorCDN(logger, chain, config)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/chaintracks/chaintracks_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestService_GetPresentHeight(t *testing.T) {
// and:
service, err := chaintracks.NewService(logging.NewTestLogger(t), config, chaintracks.Initializers{
WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) chaintracks.LiveIngestor {
return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain}, ingest.WithRestyClient(mockWOC.HttpClient()))
return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient()))
},
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/chaintracks/chaintracks_storage.interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type NamedLiveIngestor struct {
// The Synchronize method ingests headers up to the given presentHeight for provided height ranges and returns insertion results.
// TODO: refine return type from 'any' to a more specific type representing synchronization results.
type BulkIngestor interface {
Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error)
Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error)
}

// NamedBulkIngestor associates a descriptive name with a BulkIngestor interface for organized bulk header synchronization tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type BulkFileDownloader = func(ctx context.Context, fileInfo BulkHeaderFileInfo)

// Synchronize retrieves available bulk header files for the configured BSV network and prepares chunks for ingestion.
// It validates file metadata, checks network consistency, and returns a list of chunked header information for sync.
func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
// TODO: PresentHeight and ranges are not used in TS implementation, consider using them for optimization

filesInfo, err := b.reader.FetchBulkHeaderFilesInfo(ctx, b.chain)
Expand Down
210 changes: 210 additions & 0 deletions pkg/services/chaintracks/ingest/bulk_ingestor_woc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package ingest

import (
"context"
"fmt"
"log/slog"
"strings"
"time"

"github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
"github.com/go-softwarelab/common/pkg/must"
"github.com/go-softwarelab/common/pkg/to"
)

// BulkIngestorWOC provides logic to ingest and synchronize block headers from WhatsOnChain bulk endpoints.
// Utilizes a wocClient to fetch block headers and block height resources from the WhatsOnChain API service.
// Maintains a logger for structured logging and a chain identifier for selecting network-specific resources.
// Designed for efficient bulk fetching of header file metadata and incremental synchronization of chain state.
type BulkIngestorWOC struct {
logger *slog.Logger
chain defs.BSVNetwork
wocClient *wocClient
}

// NewBulkIngestorWOC creates a new BulkIngestorWOC for a given logger, network, and optional configuration options.
// It sets up a dedicated WhatsOnChain bulk client for the specified BSV network and uses the provided logger.
// Optional configuration options allow customization such as API key or overriding the default HTTP client factory.
// Returns a pointer to the BulkIngestorWOC which can efficiently ingest and synchronize block header files.
func NewBulkIngestorWOC(logger *slog.Logger, chain defs.BSVNetwork, opts ...func(options *BulkIngestorWocOptions)) *BulkIngestorWOC {
logger = logging.Child(logger, "bulk_ingestor_woc")

options := to.OptionsWithDefault(DefaultBulkIngestorWocOptions(), opts...)

return &BulkIngestorWOC{
logger: logger,
chain: chain,
wocClient: newWocClient(logger, chain, options.APIKey, options.RestyClientFactory.New()),
}
}

// Synchronize fetches available bulk header files and selects those overlapping the specified height range.
// Synchronize returns metadata for the required files and a downloader for retrieving their data from WhatsOnChain.
// Synchronize returns an error if fetching or parsing file metadata fails, or if no appropriate files are found.
func (b *BulkIngestorWOC) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) {
allFiles, err := b.fetchBulkHeaderFilesInfo(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch bulk header files info: %w", err)
}

if len(allFiles) == 0 {
return nil, nil, fmt.Errorf("no bulk header files available from WhatsOnChain")
}

neededFiles := make([]wocBulkFileInfo, 0)
for _, file := range allFiles {
if file.heightRange.Overlaps(rangeToFetch) {
neededFiles = append(neededFiles, file)
}
}

result := make([]BulkHeaderFileInfo, 0, len(neededFiles))
for _, file := range neededFiles {
bulkFileInfo, err := b.toBulkHeaderFileInfo(ctx, &file)
if err != nil {
return nil, nil, fmt.Errorf("failed to convert to BulkHeaderFileInfo for file %s: %w", file.filename, err)
}

result = append(result, *bulkFileInfo)
}

return result, b.bulkFileDownloader(), nil

}

func (b *BulkIngestorWOC) toBulkHeaderFileInfo(ctx context.Context, file *wocBulkFileInfo) (*BulkHeaderFileInfo, error) {
prevChainWork := prevChainWorkForGenesis
prevHash := genesisAsPrevBlockHash
if file.heightRange.MinHeight > 0 {
prevBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MinHeight-1)
if err != nil {
return nil, fmt.Errorf("failed to get previous block at height %d: %w", file.heightRange.MinHeight-1, err)
}

prevChainWork = prevBlock.Chainwork
prevHash = prevBlock.Hash
}

lastBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MaxHeight)
if err != nil {
return nil, fmt.Errorf("failed to get last block at height %d: %w", file.heightRange.MaxHeight, err)
}

return &BulkHeaderFileInfo{
FileName: fmt.Sprintf("%d_%d_headers.bin", file.heightRange.MinHeight, file.heightRange.MaxHeight),
FirstHeight: file.heightRange.MinHeight,
Count: must.ConvertToIntFromUnsigned(file.heightRange.MaxHeight) - must.ConvertToIntFromUnsigned(file.heightRange.MinHeight) + 1,
Chain: b.chain,
SourceURL: to.Ptr(file.url),

PrevChainWork: prevChainWork,
PrevHash: prevHash,

LastChainWork: lastBlock.Chainwork,
LastHash: &lastBlock.Hash,

// Not supported, we don't download the file at this point and WoC doesn't provide it in metadata
FileHash: nil,
}, nil
}

func (b *BulkIngestorWOC) bulkFileDownloader() BulkFileDownloader {
return func(ctx context.Context, fileInfo BulkHeaderFileInfo) (BulkFileData, error) {
if fileInfo.SourceURL == nil {
panic("SourceURL is nil in bulk file downloader")
}

b.logger.Info("Downloading bulk header file", slog.String("file_name", fileInfo.FileName))

content, err := b.wocClient.DownloadHeaderFile(ctx, *fileInfo.SourceURL)
if err != nil {
return BulkFileData{}, fmt.Errorf("failed to download bulk header file %s: %w", fileInfo.FileName, err)
}

return BulkFileData{
Info: fileInfo,
Data: content,
AccessedAt: time.Now(),
}, nil
}
}

type wocBulkFileInfo struct {
heightRange models.HeightRange
url string
filename string
}

func (b *BulkIngestorWOC) fetchBulkHeaderFilesInfo(ctx context.Context) ([]wocBulkFileInfo, error) {
response, err := b.wocClient.GetHeadersResourceList(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get headers resource list from WhatsOnChain: %w", err)
}

result := make([]wocBulkFileInfo, 0, len(response.Files))
for _, fileURL := range response.Files {
filename, heightRange, err := b.parseURL(ctx, fileURL)
if err != nil {
return nil, fmt.Errorf("failed to parse height range from URL %s: %w", fileURL, err)
}

result = append(result, wocBulkFileInfo{
heightRange: heightRange,
url: fileURL,
filename: filename,
})
}

return result, nil
}

// parseURL parses the height range from the given WhatsOnChain bulk header file URL.
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/0_10000_headers.bin",
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/10001_20000_headers.bin",
// (...)
// "https://api.whatsonchain.com/v1/bsv/main/block/headers/latest"
// The latest endpoint - we don't know the max height by URL alone; the min height is previous max + 1
// So we need to get the Content-Disposition header from the HEAD request to get the actual filename
func (b *BulkIngestorWOC) parseURL(ctx context.Context, url string) (filename string, heightRange models.HeightRange, err error) {
parts := strings.Split(url, "/block/headers/")
if len(parts) != 2 {
err = fmt.Errorf("invalid URL format: %s", url)
return
}
filename = parts[1]

if filename == "latest" {
filename, err = b.getLatestHeightRange(ctx, url)
if err != nil {
err = fmt.Errorf("failed to get latest height range from URL %s: %w", url, err)
return
}
}

_, err = fmt.Sscanf(filename, "%d_%d_headers.bin", &heightRange.MinHeight, &heightRange.MaxHeight)
if err != nil {
err = fmt.Errorf("failed to parse height range from filename %s: %w", filename, err)
return
}

return
}

// getLatestHeightRange performs a HEAD request to the given latest URL to retrieve the Content-Disposition header.
// It extracts the filename from the header to determine the actual height range of the latest bulk header
func (b *BulkIngestorWOC) getLatestHeightRange(ctx context.Context, latestURL string) (string, error) {
contentHeader, err := b.wocClient.GetContentDispositionFilename(ctx, latestURL)
if err != nil {
return "", fmt.Errorf("failed to get Content-Disposition header from WhatsOnChain: %w", err)
}

// example: Content-Disposition: attachment; filename=922001_923532_headers.bin
var filename string
if _, err = fmt.Sscanf(contentHeader, "attachment; filename=%s", &filename); err != nil {
return "", fmt.Errorf("failed to parse filename from Content-Disposition header: %w", err)
}

return filename, nil
}
40 changes: 40 additions & 0 deletions pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ingest

import (
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/httpx"
"github.com/go-resty/resty/v2"
)

// BulkIngestorWocOptions provides configuration for bulk ingestion using a configurable HTTP client and optional API key.
type BulkIngestorWocOptions struct {
RestyClientFactory *httpx.RestyClientFactory
APIKey string
}

// DefaultBulkIngestorWocOptions returns the default BulkIngestorWocOptions with a configured RestyClientFactory.
func DefaultBulkIngestorWocOptions() BulkIngestorWocOptions {
return BulkIngestorWocOptions{
RestyClientFactory: httpx.NewRestyClientFactory(),
}
}

// BulkIngestorWocOptionsBuilder provides builder methods to configure BulkIngestorWocOptions for bulk ingestion.
type BulkIngestorWocOptionsBuilder struct{}

// BulkIngestorWocOpts provides option builder methods for customizing BulkIngestorWocOptions configuration.
var BulkIngestorWocOpts BulkIngestorWocOptionsBuilder

// WithRestyClient sets a custom resty.Client to be used for HTTP requests in BulkIngestorWocOptions.
// It overrides the default RestyClientFactory with one based on the provided client instance.
func (BulkIngestorWocOptionsBuilder) WithRestyClient(client *resty.Client) func(*BulkIngestorWocOptions) {
return func(options *BulkIngestorWocOptions) {
options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client)
}
}

// WithAPIKey sets the API key to be used with the BulkIngestorWocOptions instance.
func (BulkIngestorWocOptionsBuilder) WithAPIKey(apiKey string) func(*BulkIngestorWocOptions) {
return func(options *BulkIngestorWocOptions) {
options.APIKey = apiKey
}
}
22 changes: 22 additions & 0 deletions pkg/services/chaintracks/ingest/bulk_ingestor_woc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ingest

import (
"testing"

"github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging"
"github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models"
"github.com/stretchr/testify/require"
)

func TestBulkIngestorWOC_Synchronize(t *testing.T) {
t.Skip("This test gets actual data from WOC - use this only for manual testing purposes")
service := NewBulkIngestorWOC(logging.NewTestLogger(t), defs.NetworkMainnet)

presentHeight := uint(923537)
rangeToLoad := presentHeight - 4000
fileInfo, _, err := service.Synchronize(t.Context(), presentHeight, models.NewHeightRange(rangeToLoad, presentHeight))

require.NoError(t, err)
t.Logf("Fetched file info: %+v", fileInfo)
}
Loading
Loading