From 06f2143a0681b658b265b0e2062bbcf61cefaf6c Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Mon, 17 Nov 2025 13:39:10 +0100 Subject: [PATCH 1/4] feat: implement BulkIngestorWOC for WhatsOnChain integration and refactor synchronization logic --- pkg/defs/chaintracks.go | 4 +- pkg/defs/woc_poll_ingestor.go | 22 -- pkg/services/chaintracks/bulk_manager.go | 19 +- .../chaintracks/chaintracks_initializers.go | 2 +- .../chaintracks/chaintracks_service_test.go | 2 +- .../chaintracks_storage.interface.go | 2 +- ..._bulk_ingestor.go => bulk_ingestor_cdn.go} | 2 +- .../chaintracks/ingest/bulk_ingestor_woc.go | 199 ++++++++++++++++++ .../ingest/bulk_ingestor_woc_options.go | 33 +++ .../ingest/bulk_ingestor_woc_test.go | 22 ++ .../{chaintracks_cdn_data.go => cdn_data.go} | 5 + ...haintracks_cdn_reader.go => cdn_reader.go} | 0 ..._cdn_reader_test.go => cdn_reader_test.go} | 0 .../ingest/chaintraks_woc_client.go | 179 ++++++++++++++++ .../ingest/live_ingestor_woc_poll.go | 85 ++------ .../ingest/live_ingestor_woc_poll_options.go | 39 ++-- .../ingest/live_ingestor_woc_poll_test.go | 43 ++-- pkg/services/chaintracks/ingest/woc_dto.go | 5 + .../chaintracks/models/height_range.go | 5 + 19 files changed, 526 insertions(+), 142 deletions(-) delete mode 100644 pkg/defs/woc_poll_ingestor.go rename pkg/services/chaintracks/ingest/{chaintracks_cdn_bulk_ingestor.go => bulk_ingestor_cdn.go} (97%) create mode 100644 pkg/services/chaintracks/ingest/bulk_ingestor_woc.go create mode 100644 pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go create mode 100644 pkg/services/chaintracks/ingest/bulk_ingestor_woc_test.go rename pkg/services/chaintracks/ingest/{chaintracks_cdn_data.go => cdn_data.go} (95%) rename pkg/services/chaintracks/ingest/{chaintracks_cdn_reader.go => cdn_reader.go} (100%) rename pkg/services/chaintracks/ingest/{chaintracks_cdn_reader_test.go => cdn_reader_test.go} (100%) create mode 100644 pkg/services/chaintracks/ingest/chaintraks_woc_client.go diff --git a/pkg/defs/chaintracks.go b/pkg/defs/chaintracks.go index 46fb15b2..04028108 100644 --- a/pkg/defs/chaintracks.go +++ b/pkg/defs/chaintracks.go @@ -24,8 +24,7 @@ type ChaintracksServiceConfig struct { Chain BSVNetwork `mapstructure:"-"` LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"` CDNBulkIngestors []CDNBulkIngestorConfig `mapstructure:"cdn_bulk_ingestors"` - - // TODO: Specify API key for WoC ingestor + WocAPIKey string `mapstructure:"woc_api_key"` } // Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type. @@ -68,6 +67,7 @@ func DefaultChaintracksServiceConfig() ChaintracksServiceConfig { SourceURL: BabbageBlockHeadersCDN, }, }, + WocAPIKey: "", } } diff --git a/pkg/defs/woc_poll_ingestor.go b/pkg/defs/woc_poll_ingestor.go deleted file mode 100644 index 2bd17cb9..00000000 --- a/pkg/defs/woc_poll_ingestor.go +++ /dev/null @@ -1,22 +0,0 @@ -package defs - -// WOCPollIngestorConfig holds configuration for polling data from WhatsOnChain, including chain network and API key. -type WOCPollIngestorConfig struct { - Chain BSVNetwork `mapstructure:"-"` - APIKey string `mapstructure:"api_key"` -} - -// Validate checks if the WOCPollIngestorConfig has valid configuration and returns an error if validation fails. -func (c *WOCPollIngestorConfig) Validate() error { - if err := c.Chain.Validate(); err != nil { - return err - } - return nil -} - -// DefaultWOCPollIngestorConfig returns a WOCPollIngestorConfig preconfigured for Bitcoin SV mainnet network. -func DefaultWOCPollIngestorConfig() WOCPollIngestorConfig { - return WOCPollIngestorConfig{ - Chain: NetworkMainnet, - } -} diff --git a/pkg/services/chaintracks/bulk_manager.go b/pkg/services/chaintracks/bulk_manager.go index a5d5a015..ad63aec7 100644 --- a/pkg/services/chaintracks/bulk_manager.go +++ b/pkg/services/chaintracks/bulk_manager.go @@ -35,8 +35,12 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor) *bul func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) (err error) { bm.logger.Info("Starting bulk synchronization", slog.Any("present_height", presentHeight), slog.Any("initial_ranges", initialRanges)) + missingRange := models.NewHeightRange(0, presentHeight) for _, ingestor := range bm.bulkIngestors { - bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, initialRanges) + if missingRange.IsEmpty() { + break + } + bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, missingRange) if err != nil { bm.logger.Error("Chaintracks service - error during bulk synchronization", slog.String("ingestor_name", ingestor.Name), slog.String("error", err.Error())) return fmt.Errorf("bulk synchronization failed for ingestor %s: %w", ingestor.Name, err) @@ -46,6 +50,19 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, return fmt.Errorf("failed to process bulk chunks from ingestor %s: %w", ingestor.Name, err) } + providedRange := models.NewEmptyHeightRange() + for _, chunk := range bulkChunks { + providedRange, err = providedRange.Union(chunk.ToHeightRange()) + if err != nil { + return fmt.Errorf("failed to compute provided height range from ingestor %s: %w", ingestor.Name, err) + } + } + + missingRange, err = missingRange.Subtract(providedRange) + if err != nil { + return fmt.Errorf("failed to compute missing height range after ingestor %s: %w", ingestor.Name, err) + } + // TODO: Implement DONE check and break if done } diff --git a/pkg/services/chaintracks/chaintracks_initializers.go b/pkg/services/chaintracks/chaintracks_initializers.go index f1deccbd..02ef846d 100644 --- a/pkg/services/chaintracks/chaintracks_initializers.go +++ b/pkg/services/chaintracks/chaintracks_initializers.go @@ -18,7 +18,7 @@ type Initializers struct { func DefaultInitializers() Initializers { return Initializers{ WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) LiveIngestor { - return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain}) + return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithAPIKey(config.WocAPIKey)) }, CDNBulkIngestorFactory: func(logger *slog.Logger, chain defs.BSVNetwork, config defs.CDNBulkIngestorConfig) BulkIngestor { return ingest.NewBulkIngestorCDN(logger, chain, config) diff --git a/pkg/services/chaintracks/chaintracks_service_test.go b/pkg/services/chaintracks/chaintracks_service_test.go index 3bf97016..9b771e1e 100644 --- a/pkg/services/chaintracks/chaintracks_service_test.go +++ b/pkg/services/chaintracks/chaintracks_service_test.go @@ -61,7 +61,7 @@ func TestService_GetPresentHeight(t *testing.T) { // and: service, err := chaintracks.NewService(logging.NewTestLogger(t), config, chaintracks.Initializers{ WOCLiveIngestorPollFactory: func(logger *slog.Logger, config defs.ChaintracksServiceConfig) chaintracks.LiveIngestor { - return ingest.NewLiveIngestorWocPoll(logger, defs.WOCPollIngestorConfig{Chain: config.Chain}, ingest.WithRestyClient(mockWOC.HttpClient())) + return ingest.NewLiveIngestorWocPoll(logger, config.Chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) }, }) require.NoError(t, err) diff --git a/pkg/services/chaintracks/chaintracks_storage.interface.go b/pkg/services/chaintracks/chaintracks_storage.interface.go index a312ab24..f3301e27 100644 --- a/pkg/services/chaintracks/chaintracks_storage.interface.go +++ b/pkg/services/chaintracks/chaintracks_storage.interface.go @@ -36,7 +36,7 @@ type NamedLiveIngestor struct { // The Synchronize method ingests headers up to the given presentHeight for provided height ranges and returns insertion results. // TODO: refine return type from 'any' to a more specific type representing synchronization results. type BulkIngestor interface { - Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error) + Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]ingest.BulkHeaderFileInfo, ingest.BulkFileDownloader, error) } // NamedBulkIngestor associates a descriptive name with a BulkIngestor interface for organized bulk header synchronization tasks. diff --git a/pkg/services/chaintracks/ingest/chaintracks_cdn_bulk_ingestor.go b/pkg/services/chaintracks/ingest/bulk_ingestor_cdn.go similarity index 97% rename from pkg/services/chaintracks/ingest/chaintracks_cdn_bulk_ingestor.go rename to pkg/services/chaintracks/ingest/bulk_ingestor_cdn.go index 4dcb23f6..4dea3ac7 100644 --- a/pkg/services/chaintracks/ingest/chaintracks_cdn_bulk_ingestor.go +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_cdn.go @@ -39,7 +39,7 @@ type BulkFileDownloader = func(ctx context.Context, fileInfo BulkHeaderFileInfo) // Synchronize retrieves available bulk header files for the configured BSV network and prepares chunks for ingestion. // It validates file metadata, checks network consistency, and returns a list of chunked header information for sync. -func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, ranges models.HeightRanges) ([]BulkHeaderFileInfo, BulkFileDownloader, error) { +func (b *BulkIngestorCDN) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) { // TODO: PresentHeight and ranges are not used in TS implementation, consider using them for optimization filesInfo, err := b.reader.FetchBulkHeaderFilesInfo(ctx, b.chain) diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go new file mode 100644 index 00000000..99ac9dd8 --- /dev/null +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go @@ -0,0 +1,199 @@ +package ingest + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models" + "github.com/go-softwarelab/common/pkg/must" + "github.com/go-softwarelab/common/pkg/to" +) + +type BulkIngestorWOC struct { + logger *slog.Logger + chain defs.BSVNetwork + wocClient *wocClient +} + +func NewBulkIngestorWOC(logger *slog.Logger, chain defs.BSVNetwork, opts ...func(options *BulkIngestorWocOptions)) *BulkIngestorWOC { + logger = logging.Child(logger, "bulk_ingestor_woc") + + options := to.OptionsWithDefault(DefaultBulkIngestorWocOptions(), opts...) + + return &BulkIngestorWOC{ + logger: logger, + chain: chain, + wocClient: newWocClient(logger, chain, options.APIKey, options.RestyClientFactory.New()), + } +} + +func (b *BulkIngestorWOC) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) { + allFiles, err := b.fetchBulkHeaderFilesInfo(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to fetch bulk header files info: %w", err) + } + + if len(allFiles) == 0 { + return nil, nil, fmt.Errorf("no bulk header files available from WhatsOnChain") + } + + neededFiles := make([]wocBulkFileInfo, 0) + for _, file := range allFiles { + if file.heightRange.Overlaps(rangeToFetch) { + neededFiles = append(neededFiles, file) + } + } + + result := make([]BulkHeaderFileInfo, 0, len(neededFiles)) + for _, file := range neededFiles { + bulkFileInfo, err := b.toBulkHeaderFileInfo(ctx, &file) + if err != nil { + return nil, nil, fmt.Errorf("failed to convert to BulkHeaderFileInfo for file %s: %w", file.filename, err) + } + + result = append(result, *bulkFileInfo) + } + + return result, b.bulkFileDownloader(), nil + +} + +func (b *BulkIngestorWOC) toBulkHeaderFileInfo(ctx context.Context, file *wocBulkFileInfo) (*BulkHeaderFileInfo, error) { + prevChainWork := prevChainWorkForGenesis + prevHash := genesisAsPrevBlockHash + if file.heightRange.MinHeight > 0 { + prevBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MinHeight-1) + if err != nil { + return nil, fmt.Errorf("failed to get previous block at height %d: %w", file.heightRange.MinHeight-1, err) + } + + prevChainWork = prevBlock.Chainwork + prevHash = prevBlock.Hash + } + + lastBlock, err := b.wocClient.GetBlockByHeight(ctx, file.heightRange.MaxHeight) + if err != nil { + return nil, fmt.Errorf("failed to get last block at height %d: %w", file.heightRange.MaxHeight, err) + } + + return &BulkHeaderFileInfo{ + FileName: fmt.Sprintf("%d_%d_headers.bin", file.heightRange.MinHeight, file.heightRange.MaxHeight), + FirstHeight: file.heightRange.MinHeight, + Count: must.ConvertToIntFromUnsigned(file.heightRange.MaxHeight) - must.ConvertToIntFromUnsigned(file.heightRange.MinHeight) + 1, + Chain: b.chain, + SourceURL: to.Ptr(file.url), + + PrevChainWork: prevChainWork, + PrevHash: prevHash, + + LastChainWork: lastBlock.Chainwork, + LastHash: &lastBlock.Hash, + + // Not supported, we don't download the file at this point and WoC doesn't provide it in metadata + FileHash: nil, + }, nil +} + +func (b *BulkIngestorWOC) bulkFileDownloader() BulkFileDownloader { + return func(ctx context.Context, fileInfo BulkHeaderFileInfo) (BulkFileData, error) { + if fileInfo.SourceURL == nil { + panic("SourceURL is nil in bulk file downloader") + } + + b.logger.Info("Downloading bulk header file", slog.String("file_name", fileInfo.FileName)) + + content, err := b.wocClient.DownloadHeaderFile(ctx, *fileInfo.SourceURL) + if err != nil { + return BulkFileData{}, fmt.Errorf("failed to download bulk header file %s: %w", fileInfo.FileName, err) + } + + return BulkFileData{ + Info: fileInfo, + Data: content, + AccessedAt: time.Now(), + }, nil + } +} + +type wocBulkFileInfo struct { + heightRange models.HeightRange + url string + filename string +} + +func (b *BulkIngestorWOC) fetchBulkHeaderFilesInfo(ctx context.Context) ([]wocBulkFileInfo, error) { + response, err := b.wocClient.GetHeadersResourceList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get headers resource list from WhatsOnChain: %w", err) + } + + result := make([]wocBulkFileInfo, 0, len(response.Files)) + for _, fileURL := range response.Files { + filename, heightRange, err := b.parseURL(ctx, fileURL) + if err != nil { + return nil, fmt.Errorf("failed to parse height range from URL %s: %w", fileURL, err) + } + + result = append(result, wocBulkFileInfo{ + heightRange: heightRange, + url: fileURL, + filename: filename, + }) + } + + return result, nil +} + +// parseURL parses the height range from the given WhatsOnChain bulk header file URL. +// "https://api.whatsonchain.com/v1/bsv/main/block/headers/0_10000_headers.bin", +// "https://api.whatsonchain.com/v1/bsv/main/block/headers/10001_20000_headers.bin", +// (...) +// "https://api.whatsonchain.com/v1/bsv/main/block/headers/latest" +// The latest endpoint - we don't know the max height by URL alone; the min height is previous max + 1 +// So we need to get the Content-Disposition header from the HEAD request to get the actual filename +func (b *BulkIngestorWOC) parseURL(ctx context.Context, url string) (filename string, heightRange models.HeightRange, err error) { + parts := strings.Split(url, "/block/headers/") + if len(parts) != 2 { + err = fmt.Errorf("invalid URL format: %s", url) + return + } + filename = parts[1] + + if filename == "latest" { + filename, err = b.getLatestHeightRange(ctx, url) + if err != nil { + err = fmt.Errorf("failed to get latest height range from URL %s: %w", url, err) + return + } + } + + _, err = fmt.Sscanf(filename, "%d_%d_headers.bin", &heightRange.MinHeight, &heightRange.MaxHeight) + if err != nil { + err = fmt.Errorf("failed to parse height range from filename %s: %w", filename, err) + return + } + + return +} + +// getRedirectURLForLatest doesn't follow redirects, instead it fetches the Location header from the 302 response +// this is needed to get the height range from the redirected URL +func (b *BulkIngestorWOC) getLatestHeightRange(ctx context.Context, latestURL string) (string, error) { + contentHeader, err := b.wocClient.GetContentDispositionFilename(ctx, latestURL) + if err != nil { + return "", fmt.Errorf("failed to get Content-Disposition header from WhatsOnChain: %w", err) + } + + // example: Content-Disposition: attachment; filename=922001_923532_headers.bin + var filename string + if _, err = fmt.Sscanf(contentHeader, "attachment; filename=%s", &filename); err != nil { + return "", fmt.Errorf("failed to parse filename from Content-Disposition header: %w", err) + } + + return filename, nil +} diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go new file mode 100644 index 00000000..a7780eed --- /dev/null +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go @@ -0,0 +1,33 @@ +package ingest + +import ( + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/httpx" + "github.com/go-resty/resty/v2" +) + +type BulkIngestorWocOptions struct { + RestyClientFactory *httpx.RestyClientFactory + APIKey string +} + +func DefaultBulkIngestorWocOptions() BulkIngestorWocOptions { + return BulkIngestorWocOptions{ + RestyClientFactory: httpx.NewRestyClientFactory(), + } +} + +type BulkIngestorWocOptionsBuilder struct{} + +var BulkIngestorWocOpts BulkIngestorWocOptionsBuilder + +func (BulkIngestorWocOptionsBuilder) WithRestyClient(client *resty.Client) func(*BulkIngestorWocOptions) { + return func(options *BulkIngestorWocOptions) { + options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client) + } +} + +func (BulkIngestorWocOptionsBuilder) WithAPIKey(apiKey string) func(*BulkIngestorWocOptions) { + return func(options *BulkIngestorWocOptions) { + options.APIKey = apiKey + } +} diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc_test.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_test.go new file mode 100644 index 00000000..c1abb5f0 --- /dev/null +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_test.go @@ -0,0 +1,22 @@ +package ingest + +import ( + "testing" + + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models" + "github.com/stretchr/testify/require" +) + +func TestBulkIngestorWOC_Synchronize(t *testing.T) { + t.Skip("This test gets actual data from WOC - use this only for manual testing purposes") + service := NewBulkIngestorWOC(logging.NewTestLogger(t), defs.NetworkMainnet) + + presentHeight := uint(923537) + rangeToLoad := presentHeight - 4000 + fileInfo, _, err := service.Synchronize(t.Context(), presentHeight, models.NewHeightRange(rangeToLoad, presentHeight)) + + require.NoError(t, err) + t.Logf("Fetched file info: %+v", fileInfo) +} diff --git a/pkg/services/chaintracks/ingest/chaintracks_cdn_data.go b/pkg/services/chaintracks/ingest/cdn_data.go similarity index 95% rename from pkg/services/chaintracks/ingest/chaintracks_cdn_data.go rename to pkg/services/chaintracks/ingest/cdn_data.go index c335fb69..c4a3f5b4 100644 --- a/pkg/services/chaintracks/ingest/chaintracks_cdn_data.go +++ b/pkg/services/chaintracks/ingest/cdn_data.go @@ -8,6 +8,7 @@ import ( crypto "github.com/bsv-blockchain/go-sdk/primitives/hash" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/chaintracks/models" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" "github.com/go-softwarelab/common/pkg/must" ) @@ -46,6 +47,10 @@ func (b *BulkHeaderFileInfo) Equals(other *BulkHeaderFileInfo) bool { } +func (b *BulkHeaderFileInfo) ToHeightRange() models.HeightRange { + return models.NewHeightRange(b.FirstHeight, b.FirstHeight+must.ConvertToUInt(b.Count)-1) +} + // BulkFileData represents a complete bulk block header file and its metadata for a specific blockchain network. type BulkFileData struct { Info BulkHeaderFileInfo diff --git a/pkg/services/chaintracks/ingest/chaintracks_cdn_reader.go b/pkg/services/chaintracks/ingest/cdn_reader.go similarity index 100% rename from pkg/services/chaintracks/ingest/chaintracks_cdn_reader.go rename to pkg/services/chaintracks/ingest/cdn_reader.go diff --git a/pkg/services/chaintracks/ingest/chaintracks_cdn_reader_test.go b/pkg/services/chaintracks/ingest/cdn_reader_test.go similarity index 100% rename from pkg/services/chaintracks/ingest/chaintracks_cdn_reader_test.go rename to pkg/services/chaintracks/ingest/cdn_reader_test.go diff --git a/pkg/services/chaintracks/ingest/chaintraks_woc_client.go b/pkg/services/chaintracks/ingest/chaintraks_woc_client.go new file mode 100644 index 00000000..f95d73b8 --- /dev/null +++ b/pkg/services/chaintracks/ingest/chaintraks_woc_client.go @@ -0,0 +1,179 @@ +package ingest + +import ( + "context" + "fmt" + "log/slog" + "net/http" + + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/httpx" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/whatsonchain" + "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" + "github.com/go-resty/resty/v2" +) + +type wocClient struct { + logger *slog.Logger + resty *resty.Client +} + +func newWocClient(logger *slog.Logger, chain defs.BSVNetwork, apiKey string, restyClient *resty.Client) *wocClient { + url, err := whatsonchain.MakeBaseURL(chain) + if err != nil { + panic(fmt.Sprintf("failed to build base URL for WhatsOnChain: %s", err.Error())) + } + + headers := httpx.NewHeaders(). + AcceptJSON(). + ContentTypeJSON(). + UserAgent().Value("go-wallet-toolbox"). + Authorization().IfNotEmpty(apiKey) + + restyClient = restyClient. + SetHeaders(headers). + SetLogger(logging.RestyAdapter(logger)). + SetDebug(logging.IsDebug(logger)). + SetBaseURL(url) + + return &wocClient{ + logger: logger, + resty: restyClient, + } +} + +func (c *wocClient) GetHeaderByHash(ctx context.Context, hash string) (*WOCBlockHeaderDTO, error) { + path := fmt.Sprintf("/block/%s/header", hash) + + var hdrResp WOCBlockHeaderDTO + res, err := c.resty.R(). + SetContext(ctx). + SetResult(&hdrResp). + Get(path) + + if err != nil { + return nil, fmt.Errorf("failed to fetch block header: %w", err) + } + if res.StatusCode() != http.StatusOK { + if res.StatusCode() == http.StatusNotFound { + return nil, fmt.Errorf("block header not found for hash %s: %w", hash, wdk.ErrNotFoundError) + } + return nil, fmt.Errorf("unexpected status code %d fetching block header", res.StatusCode()) + } + + if hdrResp.PrevBlock == "" { + hdrResp.PrevBlock = genesisAsPrevBlockHash + } + + return &hdrResp, nil +} + +func (c *wocClient) GetBlockByHeight(ctx context.Context, height uint) (*WOCBlockHeaderDTO, error) { + path := fmt.Sprintf("/block/height/%d", height) + + var blockResp WOCBlockHeaderDTO + res, err := c.resty.R(). + SetContext(ctx). + SetResult(&blockResp). + Get(path) + + if err != nil { + return nil, fmt.Errorf("failed to fetch block by height: %w", err) + } + if res.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d fetching block by height", res.StatusCode()) + } + + return &blockResp, nil +} + +func (c *wocClient) GetPresentHeight(ctx context.Context) (uint, error) { + path := "/chain/info" + + var infoResp blockOnlyChainInfoDTO + res, err := c.resty.R(). + SetContext(ctx). + SetResult(&infoResp). + Get(path) + + if err != nil { + return 0, fmt.Errorf("failed to fetch chain info: %w", err) + } + if res.StatusCode() != http.StatusOK { + return 0, fmt.Errorf("unexpected status code %d fetching chain info", res.StatusCode()) + } + + return infoResp.Blocks, nil +} + +func (c *wocClient) GetLastHeaders(ctx context.Context) (WOCBlockHeadersDTO, error) { + path := "/block/headers" + + var headersResponse WOCBlockHeadersDTO + res, err := c.resty.R(). + SetContext(ctx). + SetResult(&headersResponse). + Get(path) + + if err != nil { + return nil, fmt.Errorf("failed to fetch block headers: %w", err) + } + if res.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d fetching block headers", res.StatusCode()) + } + + return headersResponse, nil +} + +func (c *wocClient) GetHeadersResourceList(ctx context.Context) (*WOCHeadersResourcesDTO, error) { + path := "/block/headers/resources" + + var response *WOCHeadersResourcesDTO + resp, err := c.resty.R(). + SetContext(ctx). + SetResult(&response). + Get(path) + + if err != nil { + return nil, fmt.Errorf("failed to fetch bulk header files info: %w", err) + } + + if !resp.IsSuccess() { + return nil, fmt.Errorf("failed to fetch bulk header files info: received status code %d", resp.StatusCode()) + } + + if response == nil || len(response.Files) == 0 { + return nil, fmt.Errorf("failed to fetch bulk header files info: empty response") + } + + return response, nil +} + +func (c *wocClient) GetContentDispositionFilename(ctx context.Context, fileURL string) (string, error) { + resp, err := c.resty.R(). + SetContext(ctx). + Head(fileURL) + if err != nil { + return "", fmt.Errorf("failed to fetch latest bulk header file URL: %w", err) + } + + return resp.Header().Get("Content-Disposition"), nil +} + +func (c *wocClient) DownloadHeaderFile(ctx context.Context, fileURL string) ([]byte, error) { + resp, err := c.resty.R(). + SetContext(ctx). + SetDebug(false). //NOTE: Disable debug for large binary downloads + SetHeaders(httpx.NewHeaders().Accept().Value("application/octet-stream")). + Get(fileURL) + if err != nil { + return nil, fmt.Errorf("failed to download bulk header file: %w", err) + } + + if !resp.IsSuccess() { + return nil, fmt.Errorf("failed to download bulk header file: received status code %d", resp.StatusCode()) + } + + return resp.Body(), nil +} diff --git a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll.go b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll.go index 68d13cd4..6bdfebfc 100644 --- a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll.go +++ b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll.go @@ -4,17 +4,13 @@ import ( "context" "fmt" "log/slog" - "net/http" "slices" "sync" "time" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/defs" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/internal/logging" - "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/httpx" - "github.com/bsv-blockchain/go-wallet-toolbox/pkg/services/internal/whatsonchain" "github.com/bsv-blockchain/go-wallet-toolbox/pkg/wdk" - "github.com/go-resty/resty/v2" "github.com/go-softwarelab/common/pkg/to" "github.com/golang/groupcache/lru" ) @@ -23,9 +19,8 @@ const cachedEntries = 500 // LiveIngestorWocPoll provides functionality for polling block header data from an external source, such as WhatsOnChain. type LiveIngestorWocPoll struct { - logger *slog.Logger - config defs.WOCPollIngestorConfig - resty *resty.Client + logger *slog.Logger + wocClient *wocClient ctx context.Context cancelCtx context.CancelFunc @@ -42,33 +37,14 @@ type LiveIngestorWocPoll struct { // It initializes a Resty HTTP client configured with default headers, user agent, and API key authorization if set. // Panics if the WhatsOnChain base URL cannot be built for the specified chain network in the config. // Returns a pointer to the initialized LiveIngestorWocPoll struct, ready for external data polling operations. -func NewLiveIngestorWocPoll(logger *slog.Logger, config defs.WOCPollIngestorConfig, opts ...func(options *ClientOptions)) *LiveIngestorWocPoll { +func NewLiveIngestorWocPoll(logger *slog.Logger, chain defs.BSVNetwork, opts ...func(*IngestorWocPollOptions)) *LiveIngestorWocPoll { logger = logging.Child(logger, "live_ingestor_woc_poll") - options := to.OptionsWithDefault(DefaultClientOptions(), opts...) - - url, err := whatsonchain.MakeBaseURL(config.Chain) - if err != nil { - panic(fmt.Sprintf("failed to build base URL for WhatsOnChain: %s", err.Error())) - } - - restyClient := options.RestyClientFactory.New() - headers := httpx.NewHeaders(). - AcceptJSON(). - ContentTypeJSON(). - UserAgent().Value("go-wallet-toolbox"). - Authorization().IfNotEmpty(config.APIKey) - - restyClient = restyClient. - SetHeaders(headers). - SetLogger(logging.RestyAdapter(logger)). - SetDebug(logging.IsDebug(logger)). - SetBaseURL(url) + options := to.OptionsWithDefault(DefaultIngestorWocPollOptions(), opts...) return &LiveIngestorWocPoll{ logger: logger, - config: config, - resty: restyClient, + wocClient: newWocClient(logger, chain, options.APIKey, options.RestyClientFactory.New()), syncPeriod: options.SyncPeriod, cached: lru.New(cachedEntries), } @@ -80,26 +56,9 @@ func NewLiveIngestorWocPoll(logger *slog.Logger, config defs.WOCPollIngestorConf // The hash parameter must be a valid block hash as a hex string. // PreviousHash is set to a predefined value if the block is the genesis block. func (ing *LiveIngestorWocPoll) GetHeaderByHash(ctx context.Context, hash string) (*wdk.ChainBlockHeader, error) { - path := fmt.Sprintf("/block/%s/header", hash) - - var hdrResp WOCBlockHeaderDTO - res, err := ing.resty.R(). - SetContext(ctx). - SetResult(&hdrResp). - Get(path) - + hdrResp, err := ing.wocClient.GetHeaderByHash(ctx, hash) if err != nil { - return nil, fmt.Errorf("failed to fetch block header: %w", err) - } - if res.StatusCode() != http.StatusOK { - if res.StatusCode() == http.StatusNotFound { - return nil, fmt.Errorf("block header not found for hash %s: %w", hash, wdk.ErrNotFoundError) - } - return nil, fmt.Errorf("unexpected status code %d fetching block header", res.StatusCode()) - } - - if hdrResp.PrevBlock == "" { - hdrResp.PrevBlock = genesisAsPrevBlockHash + return nil, fmt.Errorf("failed to fetch block header by hash %s: %w", hash, err) } wdkBlockHeader, err := hdrResp.ToWDK() @@ -113,22 +72,12 @@ func (ing *LiveIngestorWocPoll) GetHeaderByHash(ctx context.Context, hash string // GetPresentHeight retrieves the current blockchain height from the external data source. // Returns the number of blocks in the chain or an error if the info cannot be fetched or parsed. func (ing *LiveIngestorWocPoll) GetPresentHeight(ctx context.Context) (uint, error) { - path := "/chain/info" - - var infoResp blockOnlyChainInfoDTO - res, err := ing.resty.R(). - SetContext(ctx). - SetResult(&infoResp). - Get(path) - + blocks, err := ing.wocClient.GetPresentHeight(ctx) if err != nil { - return 0, fmt.Errorf("failed to fetch chain info: %w", err) - } - if res.StatusCode() != http.StatusOK { - return 0, fmt.Errorf("unexpected status code %d fetching chain info", res.StatusCode()) + return 0, fmt.Errorf("failed to fetch present height: %w", err) } - return infoResp.Blocks, nil + return blocks, nil } // StartListening begins polling for new block headers and sends them to respChan until the parent context is canceled. @@ -219,19 +168,9 @@ func (ing *LiveIngestorWocPoll) StopListening() { // getLastHeaders normally fetches the last 10 block headers from the external data source. func (ing *LiveIngestorWocPoll) getLastHeaders(ctx context.Context) ([]*wdk.ChainBlockHeader, error) { - path := "/block/headers" - - var headersResponse WOCBlockHeadersDTO - res, err := ing.resty.R(). - SetContext(ctx). - SetResult(&headersResponse). - Get(path) - + headersResponse, err := ing.wocClient.GetLastHeaders(ctx) if err != nil { - return nil, fmt.Errorf("failed to fetch block headers: %w", err) - } - if res.StatusCode() != http.StatusOK { - return nil, fmt.Errorf("unexpected status code %d fetching block headers", res.StatusCode()) + return nil, fmt.Errorf("failed to fetch last block headers: %w", err) } wdkHeaders, err := headersResponse.ToWDK() diff --git a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go index c165e1b0..4f44b04a 100644 --- a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go +++ b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go @@ -7,33 +7,36 @@ import ( "github.com/go-resty/resty/v2" ) -// ClientOptions holds optional configuration for customizing client behavior, such as injecting a custom RestyClientFactory. -type ClientOptions struct { +type IngestorWocPollOptions struct { RestyClientFactory *httpx.RestyClientFactory SyncPeriod time.Duration + APIKey string } -// WithRestyClient sets a custom resty.Client for use with ClientOptions, panicking if the provided client is nil. -func WithRestyClient(client *resty.Client) func(*ClientOptions) { - if client == nil { - panic("client cannot be nil") - } - return func(o *ClientOptions) { - o.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client) +func DefaultIngestorWocPollOptions() IngestorWocPollOptions { + return IngestorWocPollOptions{ + RestyClientFactory: httpx.NewRestyClientFactory(), + SyncPeriod: 60 * time.Second, } } -// WithSyncPeriod sets a custom sync period for use with ClientOptions. -func WithSyncPeriod(period time.Duration) func(*ClientOptions) { - return func(o *ClientOptions) { - o.SyncPeriod = period +type IngestorWocPollOptionsBuilder struct{} + +var IngestorWocPollOpts IngestorWocPollOptionsBuilder + +func (IngestorWocPollOptionsBuilder) WithRestyClient(client *resty.Client) func(*IngestorWocPollOptions) { + return func(options *IngestorWocPollOptions) { + options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client) } } -// DefaultClientOptions returns a ClientOptions struct initialized with default RestyClientFactory and SyncPeriod values. -func DefaultClientOptions() ClientOptions { - return ClientOptions{ - RestyClientFactory: httpx.NewRestyClientFactory(), - SyncPeriod: 60 * time.Second, +func (IngestorWocPollOptionsBuilder) WithSyncPeriod(period time.Duration) func(*IngestorWocPollOptions) { + return func(options *IngestorWocPollOptions) { + options.SyncPeriod = period + } +} +func (IngestorWocPollOptionsBuilder) WithAPIKey(apiKey string) func(*IngestorWocPollOptions) { + return func(options *IngestorWocPollOptions) { + options.APIKey = apiKey } } diff --git a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_test.go b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_test.go index 82fb1f82..7342a215 100644 --- a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_test.go +++ b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_test.go @@ -18,12 +18,12 @@ import ( func TestLiveIngestorWOCPoll_BlockHeaderSuccessfulResp(t *testing.T) { // given: blockHash, responseBodyMap := blockHeaderStandardResponse(t) - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn(fmt.Sprintf("block/%s/header", blockHash), "GET").WithJSONResponse(200, responseBodyMap) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient())) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) // when: resp, err := ingestor.GetHeaderByHash(t.Context(), blockHash) @@ -45,12 +45,12 @@ func TestLiveIngestorWOCPoll_BlockHeaderPrevHashEmpty(t *testing.T) { blockHash, responseBodyMap := blockHeaderStandardResponse(t) responseBodyMap["previousblockhash"] = "" - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn(fmt.Sprintf("block/%s/header", blockHash), "GET").WithJSONResponse(200, responseBodyMap) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient())) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) // when: resp, err := ingestor.GetHeaderByHash(t.Context(), blockHash) @@ -63,12 +63,12 @@ func TestLiveIngestorWOCPoll_BlockHeaderPrevHashEmpty(t *testing.T) { func TestLiveIngestorWOCPoll_BlockHeaderNotFound(t *testing.T) { // given: blockHash, _ := blockHeaderStandardResponse(t) - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn(fmt.Sprintf("block/%s/header", blockHash), "GET").WithJSONResponse(404, map[string]any{}) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient())) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) // when: _, err := ingestor.GetHeaderByHash(t.Context(), blockHash) @@ -80,12 +80,12 @@ func TestLiveIngestorWOCPoll_BlockHeaderNotFound(t *testing.T) { func TestLiveIngestorWOCPoll_PollLast10Headers(t *testing.T) { // given: last10Headers := testabilities.WOCLast10Headers(t) - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn("block/headers", "GET").WithJSONResponse(200, last10Headers) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient())) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) // when: mockReceiver := testabilities.NewMockHeadersReceiver(t) @@ -103,12 +103,12 @@ func TestLiveIngestorWOCPoll_PollLast10Headers(t *testing.T) { func TestLiveIngestorWOCPoll_PollLast10Headers_Twice(t *testing.T) { // given: last10Headers := testabilities.WOCLast10Headers(t) - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn("block/headers", "GET").WithJSONResponse(200, last10Headers) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient()), ingest.WithSyncPeriod(100*time.Millisecond)) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient()), ingest.IngestorWocPollOpts.WithSyncPeriod(100*time.Millisecond)) // when: mockReceiver := testabilities.NewMockHeadersReceiver(t) @@ -126,14 +126,14 @@ func TestLiveIngestorWOCPoll_PollLast10Headers_Twice(t *testing.T) { func TestLiveIngestorWOCPoll_PollLast10Headers_TemporaryFailsDontBother(t *testing.T) { // given: last10Headers := testabilities.WOCLast10Headers(t) - config := defs.DefaultWOCPollIngestorConfig() + chain := defs.NetworkMainnet - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) // and first make it fail mockWOC.WillRespondOn("block/headers", "GET").WithJSONResponse(404, map[string]any{}) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient()), ingest.WithSyncPeriod(100*time.Millisecond)) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient()), ingest.IngestorWocPollOpts.WithSyncPeriod(100*time.Millisecond)) // when: mockReceiver := testabilities.NewMockHeadersReceiver(t) @@ -155,13 +155,12 @@ func TestLiveIngestorWOCPoll_PollLast10Headers_TemporaryFailsDontBother(t *testi func TestLiveIngestorWocPoll_GetPresentHeight(t *testing.T) { // given: const expectedHeight = 920784 + chain := defs.NetworkMainnet - config := defs.DefaultWOCPollIngestorConfig() - - mockWOC := testabilities.GivenMockWOC(t, config.Chain) + mockWOC := testabilities.GivenMockWOC(t, chain) mockWOC.WillRespondOn("chain/info", "GET").WithJSONResponse(200, map[string]any{"blocks": expectedHeight}) - ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), config, ingest.WithRestyClient(mockWOC.HttpClient())) + ingestor := ingest.NewLiveIngestorWocPoll(logging.NewTestLogger(t), chain, ingest.IngestorWocPollOpts.WithRestyClient(mockWOC.HttpClient())) // when: presentHeight, err := ingestor.GetPresentHeight(t.Context()) diff --git a/pkg/services/chaintracks/ingest/woc_dto.go b/pkg/services/chaintracks/ingest/woc_dto.go index fe86abb3..44457249 100644 --- a/pkg/services/chaintracks/ingest/woc_dto.go +++ b/pkg/services/chaintracks/ingest/woc_dto.go @@ -37,6 +37,7 @@ func bitsStrToUint32(bitsStr string) (uint32, error) { const ( genesisAsPrevBlockHash = "0000000000000000000000000000000000000000000000000000000000000000" + prevChainWorkForGenesis = "0000000000000000000000000000000000000000000000000000000000000000" ) // ToWDK converts the WOCBlockHeaderDTO to a wdk.ChainBlockHeader, parsing and validating the Bits field. @@ -84,3 +85,7 @@ func (headers WOCBlockHeadersDTO) ToWDK() ([]*wdk.ChainBlockHeader, error) { type blockOnlyChainInfoDTO struct { Blocks uint `json:"blocks"` } + +type WOCHeadersResourcesDTO struct { + Files []string `json:"files"` +} diff --git a/pkg/services/chaintracks/models/height_range.go b/pkg/services/chaintracks/models/height_range.go index 26b7fc8d..a601dddb 100644 --- a/pkg/services/chaintracks/models/height_range.go +++ b/pkg/services/chaintracks/models/height_range.go @@ -235,3 +235,8 @@ func (hr HeightRange) Above(other HeightRange) HeightRange { func (hr HeightRange) Copy() HeightRange { return hr } + +// Overlaps returns true if there is any overlap between the receiver and the other HeightRange. +func (hr HeightRange) Overlaps(other HeightRange) bool { + return !hr.Intersect(other).IsEmpty() +} From 80e2e39fa82ac2296a9602001f33c5d469957b28 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Mon, 17 Nov 2025 13:44:22 +0100 Subject: [PATCH 2/4] linter --- pkg/services/chaintracks/ingest/bulk_ingestor_woc.go | 11 +++++++++++ .../chaintracks/ingest/bulk_ingestor_woc_options.go | 7 +++++++ pkg/services/chaintracks/ingest/cdn_data.go | 1 + .../ingest/live_ingestor_woc_poll_options.go | 10 ++++++++++ pkg/services/chaintracks/ingest/woc_dto.go | 1 + 5 files changed, 30 insertions(+) diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go index 99ac9dd8..93e3419a 100644 --- a/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go @@ -14,12 +14,20 @@ import ( "github.com/go-softwarelab/common/pkg/to" ) +// BulkIngestorWOC provides logic to ingest and synchronize block headers from WhatsOnChain bulk endpoints. +// Utilizes a wocClient to fetch block headers and block height resources from the WhatsOnChain API service. +// Maintains a logger for structured logging and a chain identifier for selecting network-specific resources. +// Designed for efficient bulk fetching of header file metadata and incremental synchronization of chain state. type BulkIngestorWOC struct { logger *slog.Logger chain defs.BSVNetwork wocClient *wocClient } +// NewBulkIngestorWOC creates a new BulkIngestorWOC for a given logger, network, and optional configuration options. +// It sets up a dedicated WhatsOnChain bulk client for the specified BSV network and uses the provided logger. +// Optional configuration options allow customization such as API key or overriding the default HTTP client factory. +// Returns a pointer to the BulkIngestorWOC which can efficiently ingest and synchronize block header files. func NewBulkIngestorWOC(logger *slog.Logger, chain defs.BSVNetwork, opts ...func(options *BulkIngestorWocOptions)) *BulkIngestorWOC { logger = logging.Child(logger, "bulk_ingestor_woc") @@ -32,6 +40,9 @@ func NewBulkIngestorWOC(logger *slog.Logger, chain defs.BSVNetwork, opts ...func } } +// Synchronize fetches available bulk header files and selects those overlapping the specified height range. +// Synchronize returns metadata for the required files and a downloader for retrieving their data from WhatsOnChain. +// Synchronize returns an error if fetching or parsing file metadata fails, or if no appropriate files are found. func (b *BulkIngestorWOC) Synchronize(ctx context.Context, presentHeight uint, rangeToFetch models.HeightRange) ([]BulkHeaderFileInfo, BulkFileDownloader, error) { allFiles, err := b.fetchBulkHeaderFilesInfo(ctx) if err != nil { diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go index a7780eed..f89fe744 100644 --- a/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc_options.go @@ -5,27 +5,34 @@ import ( "github.com/go-resty/resty/v2" ) +// BulkIngestorWocOptions provides configuration for bulk ingestion using a configurable HTTP client and optional API key. type BulkIngestorWocOptions struct { RestyClientFactory *httpx.RestyClientFactory APIKey string } +// DefaultBulkIngestorWocOptions returns the default BulkIngestorWocOptions with a configured RestyClientFactory. func DefaultBulkIngestorWocOptions() BulkIngestorWocOptions { return BulkIngestorWocOptions{ RestyClientFactory: httpx.NewRestyClientFactory(), } } +// BulkIngestorWocOptionsBuilder provides builder methods to configure BulkIngestorWocOptions for bulk ingestion. type BulkIngestorWocOptionsBuilder struct{} +// BulkIngestorWocOpts provides option builder methods for customizing BulkIngestorWocOptions configuration. var BulkIngestorWocOpts BulkIngestorWocOptionsBuilder +// WithRestyClient sets a custom resty.Client to be used for HTTP requests in BulkIngestorWocOptions. +// It overrides the default RestyClientFactory with one based on the provided client instance. func (BulkIngestorWocOptionsBuilder) WithRestyClient(client *resty.Client) func(*BulkIngestorWocOptions) { return func(options *BulkIngestorWocOptions) { options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client) } } +// WithAPIKey sets the API key to be used with the BulkIngestorWocOptions instance. func (BulkIngestorWocOptionsBuilder) WithAPIKey(apiKey string) func(*BulkIngestorWocOptions) { return func(options *BulkIngestorWocOptions) { options.APIKey = apiKey diff --git a/pkg/services/chaintracks/ingest/cdn_data.go b/pkg/services/chaintracks/ingest/cdn_data.go index c4a3f5b4..1425642e 100644 --- a/pkg/services/chaintracks/ingest/cdn_data.go +++ b/pkg/services/chaintracks/ingest/cdn_data.go @@ -47,6 +47,7 @@ func (b *BulkHeaderFileInfo) Equals(other *BulkHeaderFileInfo) bool { } +// ToHeightRange returns the HeightRange spanned by this BulkHeaderFileInfo based on its FirstHeight and Count fields. func (b *BulkHeaderFileInfo) ToHeightRange() models.HeightRange { return models.NewHeightRange(b.FirstHeight, b.FirstHeight+must.ConvertToUInt(b.Count)-1) } diff --git a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go index 4f44b04a..fe365293 100644 --- a/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go +++ b/pkg/services/chaintracks/ingest/live_ingestor_woc_poll_options.go @@ -7,12 +7,15 @@ import ( "github.com/go-resty/resty/v2" ) +// IngestorWocPollOptions defines configuration settings for polling external data in the WOC ingestor component. type IngestorWocPollOptions struct { RestyClientFactory *httpx.RestyClientFactory SyncPeriod time.Duration APIKey string } +// DefaultIngestorWocPollOptions returns default options for polling block headers from an external source. +// Sets a Resty client factory and a 60-second sync period in the returned IngestorWocPollOptions struct. func DefaultIngestorWocPollOptions() IngestorWocPollOptions { return IngestorWocPollOptions{ RestyClientFactory: httpx.NewRestyClientFactory(), @@ -20,21 +23,28 @@ func DefaultIngestorWocPollOptions() IngestorWocPollOptions { } } +// IngestorWocPollOptionsBuilder provides methods to configure options for polling data ingestion. type IngestorWocPollOptionsBuilder struct{} +// IngestorWocPollOpts provides builder methods for configuring polling options with WOC-based ingestors. var IngestorWocPollOpts IngestorWocPollOptionsBuilder +// WithRestyClient sets a custom resty.Client instance for the underlying HTTP client in polling options. +// It returns a functional option to configure IngestorWocPollOptions with the provided Resty client for network requests. func (IngestorWocPollOptionsBuilder) WithRestyClient(client *resty.Client) func(*IngestorWocPollOptions) { return func(options *IngestorWocPollOptions) { options.RestyClientFactory = httpx.NewRestyClientFactoryWithBase(client) } } +// WithSyncPeriod sets the synchronization interval for polling in the ingestor options builder. func (IngestorWocPollOptionsBuilder) WithSyncPeriod(period time.Duration) func(*IngestorWocPollOptions) { return func(options *IngestorWocPollOptions) { options.SyncPeriod = period } } + +// WithAPIKey sets the API key for authenticating requests to the data provider in the ingest polling options builder. func (IngestorWocPollOptionsBuilder) WithAPIKey(apiKey string) func(*IngestorWocPollOptions) { return func(options *IngestorWocPollOptions) { options.APIKey = apiKey diff --git a/pkg/services/chaintracks/ingest/woc_dto.go b/pkg/services/chaintracks/ingest/woc_dto.go index 44457249..5eb59f9d 100644 --- a/pkg/services/chaintracks/ingest/woc_dto.go +++ b/pkg/services/chaintracks/ingest/woc_dto.go @@ -86,6 +86,7 @@ type blockOnlyChainInfoDTO struct { Blocks uint `json:"blocks"` } +// WOCHeadersResourcesDTO represents a response containing a list of available block header resource files. type WOCHeadersResourcesDTO struct { Files []string `json:"files"` } From 20dbea125861cc374f79698955d78857547c44de Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 17 Nov 2025 12:46:24 +0000 Subject: [PATCH 3/4] chore: update generated files --- pkg/defs/chaintracks.go | 2 +- pkg/services/chaintracks/ingest/woc_dto.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/defs/chaintracks.go b/pkg/defs/chaintracks.go index 04028108..fa1b46ca 100644 --- a/pkg/defs/chaintracks.go +++ b/pkg/defs/chaintracks.go @@ -24,7 +24,7 @@ type ChaintracksServiceConfig struct { Chain BSVNetwork `mapstructure:"-"` LiveIngestors []LiveIngestorType `mapstructure:"live_ingestors"` CDNBulkIngestors []CDNBulkIngestorConfig `mapstructure:"cdn_bulk_ingestors"` - WocAPIKey string `mapstructure:"woc_api_key"` + WocAPIKey string `mapstructure:"woc_api_key"` } // Validate checks if the Chain field in ChaintracksServiceConfig holds a valid BSV network type. diff --git a/pkg/services/chaintracks/ingest/woc_dto.go b/pkg/services/chaintracks/ingest/woc_dto.go index 5eb59f9d..04f71eb5 100644 --- a/pkg/services/chaintracks/ingest/woc_dto.go +++ b/pkg/services/chaintracks/ingest/woc_dto.go @@ -36,7 +36,7 @@ func bitsStrToUint32(bitsStr string) (uint32, error) { } const ( - genesisAsPrevBlockHash = "0000000000000000000000000000000000000000000000000000000000000000" + genesisAsPrevBlockHash = "0000000000000000000000000000000000000000000000000000000000000000" prevChainWorkForGenesis = "0000000000000000000000000000000000000000000000000000000000000000" ) From 5a109e27e572e6ceeb0ae3bb3bc283d851544711 Mon Sep 17 00:00:00 2001 From: Krzysztof Tomecki <152964795+chris-4chain@users.noreply.github.com> Date: Mon, 17 Nov 2025 14:01:11 +0100 Subject: [PATCH 4/4] self-review --- pkg/services/chaintracks/bulk_manager.go | 5 ++--- pkg/services/chaintracks/ingest/bulk_ingestor_woc.go | 4 ++-- .../{chaintraks_woc_client.go => chaintracks_woc_client.go} | 4 ++++ 3 files changed, 8 insertions(+), 5 deletions(-) rename pkg/services/chaintracks/ingest/{chaintraks_woc_client.go => chaintracks_woc_client.go} (97%) diff --git a/pkg/services/chaintracks/bulk_manager.go b/pkg/services/chaintracks/bulk_manager.go index ad63aec7..219cee8f 100644 --- a/pkg/services/chaintracks/bulk_manager.go +++ b/pkg/services/chaintracks/bulk_manager.go @@ -32,7 +32,7 @@ func newBulkManager(logger *slog.Logger, bulkIngestors []NamedBulkIngestor) *bul } } -func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) (err error) { +func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, initialRanges models.HeightRanges) error { bm.logger.Info("Starting bulk synchronization", slog.Any("present_height", presentHeight), slog.Any("initial_ranges", initialRanges)) missingRange := models.NewHeightRange(0, presentHeight) @@ -40,6 +40,7 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, if missingRange.IsEmpty() { break } + bulkChunks, downloader, err := ingestor.Ingestor.Synchronize(ctx, presentHeight, missingRange) if err != nil { bm.logger.Error("Chaintracks service - error during bulk synchronization", slog.String("ingestor_name", ingestor.Name), slog.String("error", err.Error())) @@ -62,8 +63,6 @@ func (bm *bulkManager) SyncBulkStorage(ctx context.Context, presentHeight uint, if err != nil { return fmt.Errorf("failed to compute missing height range after ingestor %s: %w", ingestor.Name, err) } - - // TODO: Implement DONE check and break if done } return nil diff --git a/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go index 93e3419a..5bad7459 100644 --- a/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go +++ b/pkg/services/chaintracks/ingest/bulk_ingestor_woc.go @@ -192,8 +192,8 @@ func (b *BulkIngestorWOC) parseURL(ctx context.Context, url string) (filename st return } -// getRedirectURLForLatest doesn't follow redirects, instead it fetches the Location header from the 302 response -// this is needed to get the height range from the redirected URL +// getLatestHeightRange performs a HEAD request to the given latest URL to retrieve the Content-Disposition header. +// It extracts the filename from the header to determine the actual height range of the latest bulk header func (b *BulkIngestorWOC) getLatestHeightRange(ctx context.Context, latestURL string) (string, error) { contentHeader, err := b.wocClient.GetContentDispositionFilename(ctx, latestURL) if err != nil { diff --git a/pkg/services/chaintracks/ingest/chaintraks_woc_client.go b/pkg/services/chaintracks/ingest/chaintracks_woc_client.go similarity index 97% rename from pkg/services/chaintracks/ingest/chaintraks_woc_client.go rename to pkg/services/chaintracks/ingest/chaintracks_woc_client.go index f95d73b8..1d6591e8 100644 --- a/pkg/services/chaintracks/ingest/chaintraks_woc_client.go +++ b/pkg/services/chaintracks/ingest/chaintracks_woc_client.go @@ -158,6 +158,10 @@ func (c *wocClient) GetContentDispositionFilename(ctx context.Context, fileURL s return "", fmt.Errorf("failed to fetch latest bulk header file URL: %w", err) } + if !resp.IsSuccess() { + return "", fmt.Errorf("failed to fetch latest bulk header file URL: received status code %d", resp.StatusCode()) + } + return resp.Header().Get("Content-Disposition"), nil }