diff --git a/config/cleanup.go b/config/cleanup.go new file mode 100644 index 000000000..7da3311e2 --- /dev/null +++ b/config/cleanup.go @@ -0,0 +1,15 @@ +package config + +import "time" + +// CleanupSettings contains configuration for the Cleanup service. +type CleanupSettings struct { + // GRPCListenAddress is the address the cleanup service listens on for gRPC requests + GRPCListenAddress string + + // GRPCAddress is the address other services use to connect to the cleanup service + GRPCAddress string + + // PollingInterval is how often to poll block assembly state for cleanup triggers + PollingInterval time.Duration +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 98f0855af..fe196821d 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -57,6 +57,7 @@ const ( loggerBlockchain = "bchn" loggerBlockchainClient = "bchc" loggerBlockchainSQL = "bcsql" + loggerCleanup = "cleanup" loggerKafkaConsumerBlocks = "kcb" loggerKafkaConsumerRejectedTx = "kcrtx" loggerKafkaConsumerSubtree = "kcs" @@ -88,6 +89,8 @@ const ( serviceBlockPersister = "blockpersister" serviceBlockPersisterFormal = "BlockPersister" serviceBlockValidation = "blockvalidation" + serviceCleanup = "cleanup" + serviceCleanupFormal = "Cleanup" serviceBlockValidationFormal = "BlockValidation" serviceBlockchainFormal = "Blockchain" serviceHelp = "help" @@ -511,6 +514,9 @@ func printUsage() { fmt.Println(" -alert=<1|0>") fmt.Println(" whether to start the alert service") fmt.Println("") + fmt.Println(" -cleanup=<1|0>") + fmt.Println(" whether to start the cleanup service") + fmt.Println("") fmt.Println(" -all=0") fmt.Println(" disable all services unless explicitly overridden") fmt.Println("") diff --git a/daemon/daemon_services.go b/daemon/daemon_services.go index 486b7782f..8392aa686 100644 --- a/daemon/daemon_services.go +++ b/daemon/daemon_services.go @@ -15,6 +15,7 @@ import ( "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/blockpersister" "github.com/bsv-blockchain/teranode/services/blockvalidation" + "github.com/bsv-blockchain/teranode/services/cleanup" "github.com/bsv-blockchain/teranode/services/legacy" "github.com/bsv-blockchain/teranode/services/legacy/peer" "github.com/bsv-blockchain/teranode/services/p2p" @@ -68,6 +69,7 @@ func (d *Daemon) startServices(ctx context.Context, logger ulogger.Logger, appSe startLegacy := d.shouldStart(serviceLegacyFormal, args) startRPC := d.shouldStart(serviceRPCFormal, args) startAlert := d.shouldStart(serviceAlertFormal, args) + startCleanup := d.shouldStart(serviceCleanupFormal, args) // Create the application count based on the services that are going to be started d.appCount += len(d.externalServices) @@ -117,6 +119,7 @@ func (d *Daemon) startServices(ctx context.Context, logger ulogger.Logger, appSe {startValidator, func() error { return d.startValidatorService(ctx, appSettings, createLogger) }}, {startPropagation, func() error { return d.startPropagationService(ctx, appSettings, createLogger) }}, {startLegacy, func() error { return d.startLegacyService(ctx, appSettings, createLogger) }}, + {startCleanup, func() error { return d.startCleanupService(ctx, appSettings, createLogger) }}, } // Loop through and start each service if needed @@ -1094,3 +1097,38 @@ func (d *Daemon) startLegacyService( blockassemblyClient, )) } + +// startCleanupService initializes and adds the Cleanup service to the ServiceManager. +func (d *Daemon) startCleanupService(ctx context.Context, appSettings *settings.Settings, + createLogger func(string) ulogger.Logger) error { + // Create the UTXO store for the Cleanup service + utxoStore, err := d.daemonStores.GetUtxoStore(ctx, createLogger(loggerUtxos), appSettings) + if err != nil { + return err + } + + // Create the blockchain client for the Cleanup service + blockchainClient, err := d.daemonStores.GetBlockchainClient( + ctx, createLogger(loggerBlockchainClient), appSettings, serviceCleanup, + ) + if err != nil { + return err + } + + // Create the block assembly client for the Cleanup service + blockAssemblyClient, err := blockassembly.NewClient(ctx, createLogger(loggerBlockAssembly), appSettings) + if err != nil { + return err + } + + // Add the Cleanup service to the ServiceManager + return d.ServiceManager.AddService(serviceCleanupFormal, cleanup.New( + ctx, + createLogger(loggerCleanup), + appSettings, + utxoStore, + blockchainClient, + blockAssemblyClient, + nil, // blobStore not needed for cleanup service + )) +} diff --git a/proto/cleanup/service.pb.go b/proto/cleanup/service.pb.go new file mode 100644 index 000000000..69f0e8756 --- /dev/null +++ b/proto/cleanup/service.pb.go @@ -0,0 +1,174 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v6.33.0 +// source: proto/cleanup/service.proto + +package cleanup_api + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type EmptyMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EmptyMessage) Reset() { + *x = EmptyMessage{} + mi := &file_proto_cleanup_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EmptyMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmptyMessage) ProtoMessage() {} + +func (x *EmptyMessage) ProtoReflect() protoreflect.Message { + mi := &file_proto_cleanup_service_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmptyMessage.ProtoReflect.Descriptor instead. +func (*EmptyMessage) Descriptor() ([]byte, []int) { + return file_proto_cleanup_service_proto_rawDescGZIP(), []int{0} +} + +type HealthResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` + Details string `protobuf:"bytes,2,opt,name=details,proto3" json:"details,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HealthResponse) Reset() { + *x = HealthResponse{} + mi := &file_proto_cleanup_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HealthResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthResponse) ProtoMessage() {} + +func (x *HealthResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_cleanup_service_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthResponse.ProtoReflect.Descriptor instead. +func (*HealthResponse) Descriptor() ([]byte, []int) { + return file_proto_cleanup_service_proto_rawDescGZIP(), []int{1} +} + +func (x *HealthResponse) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +func (x *HealthResponse) GetDetails() string { + if x != nil { + return x.Details + } + return "" +} + +var File_proto_cleanup_service_proto protoreflect.FileDescriptor + +const file_proto_cleanup_service_proto_rawDesc = "" + + "\n" + + "\x1bproto/cleanup/service.proto\x12\acleanup\"\x0e\n" + + "\fEmptyMessage\":\n" + + "\x0eHealthResponse\x12\x0e\n" + + "\x02ok\x18\x01 \x01(\bR\x02ok\x12\x18\n" + + "\adetails\x18\x02 \x01(\tR\adetails2H\n" + + "\n" + + "CleanupAPI\x12:\n" + + "\x06Health\x12\x15.cleanup.EmptyMessage\x1a\x17.cleanup.HealthResponse\"\x00B=Z;github.com/bitcoin-sv/teranode/services/cleanup/cleanup_apib\x06proto3" + +var ( + file_proto_cleanup_service_proto_rawDescOnce sync.Once + file_proto_cleanup_service_proto_rawDescData []byte +) + +func file_proto_cleanup_service_proto_rawDescGZIP() []byte { + file_proto_cleanup_service_proto_rawDescOnce.Do(func() { + file_proto_cleanup_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_cleanup_service_proto_rawDesc), len(file_proto_cleanup_service_proto_rawDesc))) + }) + return file_proto_cleanup_service_proto_rawDescData +} + +var file_proto_cleanup_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_cleanup_service_proto_goTypes = []any{ + (*EmptyMessage)(nil), // 0: cleanup.EmptyMessage + (*HealthResponse)(nil), // 1: cleanup.HealthResponse +} +var file_proto_cleanup_service_proto_depIdxs = []int32{ + 0, // 0: cleanup.CleanupAPI.Health:input_type -> cleanup.EmptyMessage + 1, // 1: cleanup.CleanupAPI.Health:output_type -> cleanup.HealthResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_cleanup_service_proto_init() } +func file_proto_cleanup_service_proto_init() { + if File_proto_cleanup_service_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_cleanup_service_proto_rawDesc), len(file_proto_cleanup_service_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_cleanup_service_proto_goTypes, + DependencyIndexes: file_proto_cleanup_service_proto_depIdxs, + MessageInfos: file_proto_cleanup_service_proto_msgTypes, + }.Build() + File_proto_cleanup_service_proto = out.File + file_proto_cleanup_service_proto_goTypes = nil + file_proto_cleanup_service_proto_depIdxs = nil +} diff --git a/proto/cleanup/service_grpc.pb.go b/proto/cleanup/service_grpc.pb.go new file mode 100644 index 000000000..8428e9fa9 --- /dev/null +++ b/proto/cleanup/service_grpc.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.33.0 +// source: proto/cleanup/service.proto + +package cleanup_api + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + CleanupAPI_Health_FullMethodName = "/cleanup.CleanupAPI/Health" +) + +// CleanupAPIClient is the client API for CleanupAPI service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CleanupAPIClient interface { + Health(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (*HealthResponse, error) +} + +type cleanupAPIClient struct { + cc grpc.ClientConnInterface +} + +func NewCleanupAPIClient(cc grpc.ClientConnInterface) CleanupAPIClient { + return &cleanupAPIClient{cc} +} + +func (c *cleanupAPIClient) Health(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (*HealthResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(HealthResponse) + err := c.cc.Invoke(ctx, CleanupAPI_Health_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CleanupAPIServer is the server API for CleanupAPI service. +// All implementations must embed UnimplementedCleanupAPIServer +// for forward compatibility. +type CleanupAPIServer interface { + Health(context.Context, *EmptyMessage) (*HealthResponse, error) + mustEmbedUnimplementedCleanupAPIServer() +} + +// UnimplementedCleanupAPIServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedCleanupAPIServer struct{} + +func (UnimplementedCleanupAPIServer) Health(context.Context, *EmptyMessage) (*HealthResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Health not implemented") +} +func (UnimplementedCleanupAPIServer) mustEmbedUnimplementedCleanupAPIServer() {} +func (UnimplementedCleanupAPIServer) testEmbeddedByValue() {} + +// UnsafeCleanupAPIServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CleanupAPIServer will +// result in compilation errors. +type UnsafeCleanupAPIServer interface { + mustEmbedUnimplementedCleanupAPIServer() +} + +func RegisterCleanupAPIServer(s grpc.ServiceRegistrar, srv CleanupAPIServer) { + // If the following call pancis, it indicates UnimplementedCleanupAPIServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&CleanupAPI_ServiceDesc, srv) +} + +func _CleanupAPI_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EmptyMessage) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CleanupAPIServer).Health(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CleanupAPI_Health_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CleanupAPIServer).Health(ctx, req.(*EmptyMessage)) + } + return interceptor(ctx, in, info, handler) +} + +// CleanupAPI_ServiceDesc is the grpc.ServiceDesc for CleanupAPI service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CleanupAPI_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "cleanup.CleanupAPI", + HandlerType: (*CleanupAPIServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Health", + Handler: _CleanupAPI_Health_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/cleanup/service.proto", +} diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index 950bc83c3..51a784774 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -7,7 +7,6 @@ import ( "encoding/binary" "fmt" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -22,7 +21,6 @@ import ( "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/settings" "github.com/bsv-blockchain/teranode/stores/blob" - "github.com/bsv-blockchain/teranode/stores/cleanup" "github.com/bsv-blockchain/teranode/stores/utxo" "github.com/bsv-blockchain/teranode/stores/utxo/fields" "github.com/bsv-blockchain/teranode/stores/utxo/meta" @@ -128,10 +126,6 @@ type BlockAssembler struct { stateChangeMu sync.RWMutex stateChangeCh chan BestBlockInfo - // lastPersistedHeight tracks the last block height processed by block persister - // This is updated via BlockPersisted notifications and used to coordinate with cleanup - lastPersistedHeight atomic.Uint32 - // currentChainMap maps block hashes to their heights currentChainMap map[chainhash.Hash]uint32 @@ -153,18 +147,6 @@ type BlockAssembler struct { // currentRunningState tracks the current operational state currentRunningState atomic.Value - // cleanupService manages background cleanup tasks - cleanupService cleanup.Service - - // cleanupServiceLoaded indicates if the cleanup service has been loaded - cleanupServiceLoaded atomic.Bool - - // cleanupQueueCh queues cleanup operations (parent preserve + DAH cleanup) to prevent flooding during catchup - cleanupQueueCh chan uint32 - - // cleanupQueueWorkerStarted tracks if the cleanup queue worker is running - cleanupQueueWorkerStarted atomic.Bool - // unminedCleanupTicker manages periodic cleanup of old unmined transactions unminedCleanupTicker *time.Ticker // cachedCandidate stores the cached mining candidate @@ -363,28 +345,6 @@ func (b *BlockAssembler) startChannelListeners(ctx context.Context) (err error) if notification.Type == model.NotificationType_Block { b.processNewBlockAnnouncement(ctx) - } else if notification.Type == model.NotificationType_BlockPersisted { - // RUNTIME COORDINATION: Update persisted height from block persister - // - // Block persister sends this notification after successfully persisting a block - // and creating its .subtree_data file. We track this height so cleanup can safely - // delete transactions from earlier blocks without breaking catchup. - // - // This notification-based update keeps our persisted height current during normal - // operation. Combined with the startup initialization from blockchain state, - // we always know how far block persister has progressed. - // - // Cleanup uses this via GetLastPersistedHeight() to calculate safe deletion height. - if notification.Metadata != nil && notification.Metadata.Metadata != nil { - if heightStr, ok := notification.Metadata.Metadata["height"]; ok { - if height, err := strconv.ParseUint(heightStr, 10, 32); err == nil { - b.lastPersistedHeight.Store(uint32(height)) - b.logger.Debugf("[BlockAssembler] Block persister progress: height %d", height) - } else { - b.logger.Warnf("[BlockAssembler] Failed to parse persisted height from notification: %v", err) - } - } - } } b.setCurrentRunningState(StateRunning) @@ -726,27 +686,6 @@ func (b *BlockAssembler) setBestBlockHeader(bestBlockchainBlockHeader *model.Blo // Invalidate cache when block height changes b.invalidateMiningCandidateCache() - - // Queue cleanup operations to prevent flooding during catchup - // The cleanup queue worker processes operations sequentially (parent preserve → DAH cleanup) - // Capture channel reference to avoid TOCTOU race between nil check and send - ch := b.cleanupQueueCh - if b.utxoStore != nil && b.cleanupServiceLoaded.Load() && b.cleanupService != nil && height > 0 && ch != nil && b.cleanupQueueWorkerStarted.Load() { - // Non-blocking send - drop if queue is full (shouldn't happen with 100 buffer, but safety check) - func() { - defer func() { - if r := recover(); r != nil { - b.logger.Debugf("[BlockAssembler] cleanup queue closed; skipping cleanup for height %d", height) - } - }() - select { - case ch <- height: - // Successfully queued - default: - b.logger.Warnf("[BlockAssembler] cleanup queue full, dropping cleanup for height %d", height) - } - }() - } } // setCurrentRunningState sets the current operational state. @@ -766,106 +705,6 @@ func (b *BlockAssembler) GetCurrentRunningState() State { return b.currentRunningState.Load().(State) } -// GetLastPersistedHeight returns the last block height processed by block persister. -// This is used by cleanup service to avoid deleting transactions before they're persisted. -// -// Returns: -// - uint32: Last persisted block height -func (b *BlockAssembler) GetLastPersistedHeight() uint32 { - return b.lastPersistedHeight.Load() -} - -// startCleanupQueueWorker starts a background worker that processes cleanup operations sequentially. -// This prevents flooding the system with concurrent cleanup operations during block catchup. -// -// The worker processes one block height at a time, running parent preserve followed by DAH cleanup. -// If multiple heights are queued, only the latest is processed (deduplication). -// -// Parameters: -// - ctx: Context for cancellation -func (b *BlockAssembler) startCleanupQueueWorker(ctx context.Context) { - // Only start once - if !b.cleanupQueueWorkerStarted.CompareAndSwap(false, true) { - return - } - - // Initialize the cleanup queue channel with a buffer to handle bursts during catchup - b.cleanupQueueCh = make(chan uint32, 100) - - go func() { - defer func() { - // Close the channel - no need to drain because: - // 1. Senders use non-blocking send (select with default) - // 2. Channel is never set to nil, so no risk of blocking on nil channel - close(b.cleanupQueueCh) - b.cleanupQueueWorkerStarted.Store(false) - }() - - for { - select { - case <-ctx.Done(): - b.logger.Infof("[BlockAssembler] cleanup queue worker stopping") - return - - case height := <-b.cleanupQueueCh: - // Deduplicate: drain any additional heights and only process the latest - latestHeight := height - drained := false - for { - select { - case nextHeight := <-b.cleanupQueueCh: - latestHeight = nextHeight - drained = true - default: - // No more heights in queue - if drained { - b.logger.Debugf("[BlockAssembler] deduplicating cleanup operations, skipping to height %d", latestHeight) - } - goto processHeight - } - } - - processHeight: - // Step 1: Preserve parents of old unmined transactions FIRST - // This sets preserve_until and clears delete_at_height on parent transactions - if b.utxoStore != nil { - _, err := utxo.PreserveParentsOfOldUnminedTransactions(ctx, b.utxoStore, latestHeight, b.settings, b.logger) - if err != nil { - b.logger.Errorf("[BlockAssembler] error preserving parents during block height %d update: %v", latestHeight, err) - continue - } - } - - // Step 2: Then trigger DAH cleanup and WAIT for it to complete - // This ensures true sequential execution: preserve → cleanup (complete) → next height - // Without waiting, the cleanup is queued but runs async, which could cause races - if b.cleanupServiceLoaded.Load() && b.cleanupService != nil { - // Create a channel to wait for completion - doneCh := make(chan string, 1) - - if err := b.cleanupService.UpdateBlockHeight(latestHeight, doneCh); err != nil { - b.logger.Errorf("[BlockAssembler] cleanup service error updating block height %d: %v", latestHeight, err) - continue - } - - // Wait for cleanup to complete or context cancellation - select { - case status := <-doneCh: - if status != "completed" { - b.logger.Warnf("[BlockAssembler] cleanup for height %d finished with status: %s", latestHeight, status) - } - case <-ctx.Done(): - b.logger.Infof("[BlockAssembler] context cancelled while waiting for cleanup at height %d", latestHeight) - return - } - } - } - } - }() - - b.logger.Infof("[BlockAssembler] cleanup queue worker started") -} - // Start initializes and begins the block assembler operations. // // Parameters: @@ -896,81 +735,6 @@ func (b *BlockAssembler) Start(ctx context.Context) (err error) { return errors.NewProcessingError("[BlockAssembler] failed to start channel listeners: %v", err) } - // CRITICAL STARTUP COORDINATION: Initialize persisted height from block persister's state - // - // PROBLEM: Block persister creates .subtree_data files after a delay (BlockPersisterPersistAge blocks), - // but cleanup deletes transactions based only on delete_at_height. If cleanup runs before block persister - // has created .subtree_data files, those files will reference deleted transactions, causing catchup failures - // with "subtree length does not match tx data length" errors (actually missing transactions). - // - // SOLUTION: Cleanup coordinates with block persister by limiting deletion to: - // max_cleanup_height = min(requested_cleanup_height, persisted_height + retention) - // - // STARTUP RACE: Block persister notifications arrive asynchronously after BlockAssembler starts. - // If cleanup runs before the first notification arrives, it doesn't know the persisted height and - // could delete transactions that block persister still needs. - // - // PREVENTION: Read block persister's last persisted height from blockchain state on startup. - // Block persister publishes this state on its own startup, so we have the current value immediately. - // - // SCENARIOS: - // 1. Block persister running: State available, cleanup immediately coordinates correctly - // 2. Block persister not deployed: State missing, cleanup proceeds normally (height=0 disables coordination) - // 3. Block persister hasn't started yet: State missing, will get notification soon, cleanup waits - // 4. Block persister disabled: State missing, cleanup works without coordination - // - // All scenarios are safe. This prevents premature cleanup during the startup window. - if state, err := b.blockchainClient.GetState(ctx, "BlockPersisterHeight"); err == nil && len(state) >= 4 { - height := binary.LittleEndian.Uint32(state) - if height > 0 { - b.lastPersistedHeight.Store(height) - b.logger.Infof("[BlockAssembler] Initialized persisted height from block persister state: %d", height) - } - } else if err != nil { - // State doesn't exist - block persister either not deployed, hasn't started, or first run. - // All cases are safe (cleanup checks for height=0 and proceeds normally without coordination). - b.logger.Debugf("[BlockAssembler] Block persister state not available: %v", err) - } - - // Check if the UTXO store supports cleanup operations - if !b.settings.UtxoStore.DisableDAHCleaner { - if cleanupServiceProvider, ok := b.utxoStore.(cleanup.CleanupServiceProvider); ok { - b.logger.Infof("[BlockAssembler] initialising cleanup service") - - b.cleanupService, err = cleanupServiceProvider.GetCleanupService() - if err != nil { - return err - } - - if b.cleanupService != nil { - // CLEANUP COORDINATION: Wire up block persister progress tracking - // - // Cleanup needs to know how far block persister has progressed so it doesn't - // delete transactions that block persister still needs to create .subtree_data files. - // - // The cleanup service will call GetLastPersistedHeight() before each cleanup operation - // and limit deletion to: min(requested_height, persisted_height + retention) - // - // This getter provides the persisted height that was: - // 1. Initialized from blockchain state on startup (preventing startup race) - // 2. Updated via BlockPersisted notifications during runtime (keeping current) - // - // See processCleanupJob in cleanup_service.go for the coordination logic. - b.cleanupService.SetPersistedHeightGetter(b.GetLastPersistedHeight) - b.logger.Infof("[BlockAssembler] Configured cleanup service to coordinate with block persister") - - b.logger.Infof("[BlockAssembler] starting cleanup service") - b.cleanupService.Start(ctx) - } - - b.cleanupServiceLoaded.Store(true) - - // Start the cleanup queue worker to process parent preserve and DAH cleanup operations - // This prevents flooding the system with concurrent operations during block catchup - b.startCleanupQueueWorker(ctx) - } - } - _, height := b.CurrentBlock() prometheusBlockAssemblyCurrentBlockHeight.Set(float64(height)) diff --git a/services/blockassembly/BlockAssembler_test.go b/services/blockassembly/BlockAssembler_test.go index 97b939049..09b84f271 100644 --- a/services/blockassembly/BlockAssembler_test.go +++ b/services/blockassembly/BlockAssembler_test.go @@ -1859,82 +1859,6 @@ func TestBlockAssembly_processNewBlockAnnouncement_ErrorHandling(t *testing.T) { }) } -func TestBlockAssembly_setBestBlockHeader_CleanupServiceFailures(t *testing.T) { - t.Run("setBestBlockHeader handles cleanup service failures gracefully", func(t *testing.T) { - initPrometheusMetrics() - testItems := setupBlockAssemblyTest(t) - require.NotNil(t, testItems) - - // Create a mock cleanup service that fails - mockCleanupService := &MockCleanupService{} - mockCleanupService.On("UpdateBlockHeight", mock.Anything, mock.Anything).Return(errors.NewProcessingError("cleanup service failed")) - - // Set the cleanup service and mark it as loaded - testItems.blockAssembler.cleanupService = mockCleanupService - testItems.blockAssembler.cleanupServiceLoaded.Store(true) - - // Start the cleanup queue worker - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - testItems.blockAssembler.startCleanupQueueWorker(ctx) - - // Set state to running so cleanup is triggered - testItems.blockAssembler.setCurrentRunningState(StateRunning) - - // Create a new block header - newHeader := &model.BlockHeader{ - Version: 1, - HashPrevBlock: model.GenesisBlockHeader.Hash(), - HashMerkleRoot: &chainhash.Hash{}, - Timestamp: 1234567890, - Bits: model.NBit{}, - Nonce: 1234, - } - newHeight := uint32(1) - - // Call setBestBlockHeader - should not panic or fail even if cleanup service fails - testItems.blockAssembler.setBestBlockHeader(newHeader, newHeight) - - // Verify the block header was still set correctly - currentHeader, currentHeight := testItems.blockAssembler.CurrentBlock() - assert.Equal(t, newHeader, currentHeader) - assert.Equal(t, newHeight, currentHeight) - - // Wait for background goroutine to complete (parent preserve + cleanup trigger) - time.Sleep(100 * time.Millisecond) - - // Verify cleanup service was called - mockCleanupService.AssertCalled(t, "UpdateBlockHeight", newHeight, mock.Anything) - }) - - t.Run("setBestBlockHeader skips cleanup when cleanup service not loaded", func(t *testing.T) { - initPrometheusMetrics() - testItems := setupBlockAssemblyTest(t) - require.NotNil(t, testItems) - - // Ensure cleanup service is not loaded - testItems.blockAssembler.cleanupServiceLoaded.Store(false) - - newHeader := &model.BlockHeader{ - Version: 1, - HashPrevBlock: model.GenesisBlockHeader.Hash(), - HashMerkleRoot: &chainhash.Hash{}, - Timestamp: 1234567890, - Bits: model.NBit{}, - Nonce: 1234, - } - newHeight := uint32(1) - - // This should work without any cleanup service calls - testItems.blockAssembler.setBestBlockHeader(newHeader, newHeight) - - // Verify the block header was set correctly - currentHeader, currentHeight := testItems.blockAssembler.CurrentBlock() - assert.Equal(t, newHeader, currentHeader) - assert.Equal(t, newHeight, currentHeight) - }) -} - // TestBlockAssembly_CoinbaseCalculationFix specifically targets issue #3968 // This test ensures coinbase value never exceeds fees + subsidy by exactly 1 satoshi func TestBlockAssembly_CoinbaseCalculationFix(t *testing.T) { @@ -2500,31 +2424,3 @@ func TestGetMiningCandidate_SendTimeoutResetsGenerationFlag(t *testing.T) { assert.Nil(t, ba.cachedCandidate.generationChan, "generation channel should still be nil") ba.cachedCandidate.mu.RUnlock() } - -// TestGetLastPersistedHeight tests the GetLastPersistedHeight method -func TestGetLastPersistedHeight(t *testing.T) { - initPrometheusMetrics() - - t.Run("GetLastPersistedHeight returns initial zero value", func(t *testing.T) { - testItems := setupBlockAssemblyTest(t) - require.NotNil(t, testItems) - ba := testItems.blockAssembler - - // Initially should be 0 - height := ba.GetLastPersistedHeight() - assert.Equal(t, uint32(0), height) - }) - - t.Run("GetLastPersistedHeight returns updated value", func(t *testing.T) { - testItems := setupBlockAssemblyTest(t) - require.NotNil(t, testItems) - ba := testItems.blockAssembler - - // Store a value - ba.lastPersistedHeight.Store(uint32(100)) - - // Should return the stored value - height := ba.GetLastPersistedHeight() - assert.Equal(t, uint32(100), height) - }) -} diff --git a/services/cleanup/CleanupWorker.go b/services/cleanup/CleanupWorker.go new file mode 100644 index 000000000..1714b7030 --- /dev/null +++ b/services/cleanup/CleanupWorker.go @@ -0,0 +1,215 @@ +package cleanup + +import ( + "context" + "time" + + "github.com/bsv-blockchain/teranode/stores/utxo" +) + +// pollingWorker polls the block assembly service every polling interval to check +// if cleanup should be triggered. It queues cleanup when: +// 1. Block height has changed since last processed +// 2. Cleanup channel is available (no cleanup currently in progress) +// +// Safety checks (block assembly state) are performed in cleanupProcessor to eliminate +// race conditions where state could change between queueing and execution. +func (s *Server) pollingWorker(ctx context.Context) { + ticker := time.NewTicker(s.settings.Cleanup.PollingInterval) + defer ticker.Stop() + + s.logger.Infof("Starting cleanup polling worker (interval: %v)", s.settings.Cleanup.PollingInterval) + + for { + select { + case <-ctx.Done(): + s.logger.Infof("Stopping cleanup polling worker") + return + + case <-ticker.C: + // Poll block assembly state to get current height + state, err := s.blockAssemblyClient.GetBlockAssemblyState(ctx) + if err != nil { + s.logger.Errorf("Failed to get block assembly state: %v", err) + cleanupErrors.WithLabelValues("poll").Inc() + continue + } + + if state.CurrentHeight <= s.lastProcessedHeight.Load() { + s.logger.Debugf("Skipping cleanup: no new height (current: %d, last processed: %d)", + state.CurrentHeight, s.lastProcessedHeight.Load()) + cleanupSkipped.WithLabelValues("no_new_height").Inc() + continue + } + + // Try to queue cleanup (non-blocking) + // Safety checks are performed in cleanupProcessor to avoid race conditions + select { + case s.cleanupCh <- state.CurrentHeight: + s.logger.Debugf("Queued cleanup for height %d", state.CurrentHeight) + default: + s.logger.Infof("Cleanup already in progress, skipping height %d", state.CurrentHeight) + cleanupSkipped.WithLabelValues("already_in_progress").Inc() + } + } + } +} + +// cleanupProcessor processes cleanup requests from the cleanup channel. +// It drains the channel to get the latest height (deduplication), then performs +// cleanup in three sequential steps: +// 1. Preserve parents of old unmined transactions +// 2. Process expired preservations (convert preserve_until to delete_at_height) +// 3. Delete-at-height (DAH) cleanup +// +// Safety checks (block assembly state) are performed immediately before each phase +// to prevent race conditions where state could change between queueing and execution. +// +// This goroutine ensures only one cleanup operation runs at a time by processing +// from a buffered channel (size 1). +func (s *Server) cleanupProcessor(ctx context.Context) { + s.logger.Infof("Starting cleanup processor") + + for { + select { + case <-ctx.Done(): + s.logger.Infof("Stopping cleanup processor") + return + + case height := <-s.cleanupCh: + // Deduplicate: drain channel and process latest height only + // This is important during block catchup when multiple heights may be queued + latestHeight := height + drained := false + drainLoop: + for { + select { + case nextHeight := <-s.cleanupCh: + latestHeight = nextHeight + drained = true + default: + break drainLoop + } + } + + if drained { + s.logger.Debugf("Deduplicating cleanup operations, skipping to height %d", latestHeight) + } + + // Safety check before preserve parents phase + // Check block assembly state to ensure it's safe to run cleanup + state, err := s.blockAssemblyClient.GetBlockAssemblyState(ctx) + if err != nil { + s.logger.Errorf("Failed to get block assembly state before preserve parents: %v", err) + cleanupErrors.WithLabelValues("state_check").Inc() + continue + } + + if state.BlockAssemblyState != "running" { + s.logger.Infof("Skipping cleanup for height %d: block assembly state is %s (not running)", latestHeight, state.BlockAssemblyState) + cleanupSkipped.WithLabelValues("not_running").Inc() + continue + } + + // Step 1: Preserve parents of old unmined transactions FIRST + // This ensures parents of old unmined transactions are not deleted by DAH cleanup + // CRITICAL: If this phase fails, we MUST NOT proceed to subsequent phases, + // as DAH cleanup could delete parents that should be preserved. + s.logger.Infof("Starting cleanup for height %d: preserving parents", latestHeight) + startTime := time.Now() + + if s.utxoStore != nil { + _, err := utxo.PreserveParentsOfOldUnminedTransactions( + ctx, s.utxoStore, latestHeight, s.settings, s.logger) + if err != nil { + s.logger.Errorf("CRITICAL: Failed to preserve parents at height %d, ABORTING cleanup to prevent data loss: %v", latestHeight, err) + cleanupErrors.WithLabelValues("preserve_parents_failed").Inc() + cleanupSkipped.WithLabelValues("preserve_failed").Inc() + // ABORT: Do not proceed to Phase 2 or 3 - could cause data loss + continue + } + cleanupDuration.WithLabelValues("preserve_parents").Observe(time.Since(startTime).Seconds()) + } + + // Safety check before process expired preservations phase + // Recheck block assembly state to ensure it hasn't changed (e.g., to reorg) + state, err = s.blockAssemblyClient.GetBlockAssemblyState(ctx) + if err != nil { + s.logger.Errorf("Failed to get block assembly state before process expired preservations: %v", err) + cleanupErrors.WithLabelValues("state_check").Inc() + continue + } + + if state.BlockAssemblyState != "running" { + s.logger.Infof("Skipping process expired preservations for height %d: block assembly state changed to %s (not running)", latestHeight, state.BlockAssemblyState) + cleanupSkipped.WithLabelValues("not_running").Inc() + continue + } + + // Safety check before DAH cleanup phase + // Recheck block assembly state to ensure it hasn't changed (e.g., to reorg) + state, err = s.blockAssemblyClient.GetBlockAssemblyState(ctx) + if err != nil { + s.logger.Errorf("Failed to get block assembly state before DAH cleanup: %v", err) + cleanupErrors.WithLabelValues("state_check").Inc() + continue + } + + if state.BlockAssemblyState != "running" { + s.logger.Infof("Skipping DAH cleanup for height %d: block assembly state changed to %s (not running)", latestHeight, state.BlockAssemblyState) + cleanupSkipped.WithLabelValues("not_running").Inc() + continue + } + + // Step 2: Then trigger DAH cleanup and WAIT for it to complete + // DAH cleanup deletes transactions marked for deletion at or before the current height + if s.cleanupService != nil { + s.logger.Infof("Starting cleanup for height %d: DAH cleanup", latestHeight) + startTime = time.Now() + doneCh := make(chan string, 1) + + if err := s.cleanupService.UpdateBlockHeight(latestHeight, doneCh); err != nil { + s.logger.Errorf("Cleanup service error updating block height %d: %v", latestHeight, err) + cleanupErrors.WithLabelValues("dah_cleanup").Inc() + continue + } + + // Wait for cleanup to complete with timeout + cleanupTimeout := s.settings.Cleanup.JobTimeout + timeoutTimer := time.NewTimer(cleanupTimeout) + defer timeoutTimer.Stop() + + select { + case status := <-doneCh: + if status != "completed" { + s.logger.Warnf("Cleanup for height %d finished with status: %s", latestHeight, status) + cleanupErrors.WithLabelValues("dah_cleanup").Inc() + } else { + s.logger.Infof("Cleanup for height %d completed successfully", latestHeight) + cleanupDuration.WithLabelValues("dah_cleanup").Observe(time.Since(startTime).Seconds()) + cleanupProcessed.Inc() + } + case <-timeoutTimer.C: + s.logger.Infof("Cleanup for height %d exceeded coordinator timeout of %v - cleanup continues in background, re-queuing immediately", latestHeight, cleanupTimeout) + // Note: This is not an error - the cleanup job continues processing in the background. + // The coordinator re-queues immediately to check again without waiting for the next polling interval. + // Very large cleanups may take longer than the timeout and require multiple iterations. + + // Immediately re-queue to check again (non-blocking) + select { + case s.cleanupCh <- latestHeight: + s.logger.Debugf("Re-queued cleanup for height %d after timeout", latestHeight) + default: + // Channel full, will be picked up by next poll anyway + s.logger.Debugf("Cleanup channel full, will retry on next poll") + } + case <-ctx.Done(): + return + } + } + + // Update last processed height atomically + s.lastProcessedHeight.Store(latestHeight) + } + } +} diff --git a/services/cleanup/Server.go b/services/cleanup/Server.go new file mode 100644 index 000000000..c75f4bcfb --- /dev/null +++ b/services/cleanup/Server.go @@ -0,0 +1,272 @@ +// Package cleanup provides the Cleanup Service which handles periodic cleanup of unmined transaction +// parents and delete-at-height (DAH) records in the UTXO store. It polls the Block Assembly service +// state and triggers cleanup operations only when safe to do so (i.e., when block assembly is in +// "running" state and not performing reorgs or resets). +package cleanup + +import ( + "context" + "encoding/binary" + "net/http" + "strconv" + "sync" + "sync/atomic" + + "github.com/bsv-blockchain/teranode/errors" + "github.com/bsv-blockchain/teranode/model" + "github.com/bsv-blockchain/teranode/services/blockassembly" + "github.com/bsv-blockchain/teranode/services/blockchain" + "github.com/bsv-blockchain/teranode/services/cleanup/cleanup_api" + "github.com/bsv-blockchain/teranode/settings" + "github.com/bsv-blockchain/teranode/stores/blob" + "github.com/bsv-blockchain/teranode/stores/cleanup" + "github.com/bsv-blockchain/teranode/stores/utxo" + "github.com/bsv-blockchain/teranode/ulogger" + "github.com/bsv-blockchain/teranode/util" + "github.com/bsv-blockchain/teranode/util/health" + "github.com/ordishs/gocore" + "google.golang.org/grpc" +) + +// Server implements the Cleanup service which handles periodic cleanup operations +// for the UTXO store. It polls block assembly state and triggers cleanup when safe. +type Server struct { + cleanup_api.UnsafeCleanupAPIServer + + // Dependencies (injected via constructor) + ctx context.Context + logger ulogger.Logger + settings *settings.Settings + utxoStore utxo.Store + blockchainClient blockchain.ClientI + blockAssemblyClient blockassembly.ClientI + blobStore blob.Store + + // Internal state + cleanupService cleanup.Service + lastProcessedHeight atomic.Uint32 + lastPersistedHeight atomic.Uint32 + cleanupCh chan uint32 + stats *gocore.Stat +} + +// New creates a new Cleanup server instance with the provided dependencies. +// This function initializes the server but does not start any background processes. +// Call Init() and then Start() to begin operation. +func New( + ctx context.Context, + logger ulogger.Logger, + tSettings *settings.Settings, + utxoStore utxo.Store, + blockchainClient blockchain.ClientI, + blockAssemblyClient blockassembly.ClientI, + blobStore blob.Store, +) *Server { + return &Server{ + ctx: ctx, + logger: logger, + settings: tSettings, + utxoStore: utxoStore, + blockchainClient: blockchainClient, + blockAssemblyClient: blockAssemblyClient, + blobStore: blobStore, + stats: gocore.NewStat("cleanup"), + } +} + +// Init initializes the cleanup service. This is called before Start() and is responsible +// for setting up the cleanup service provider from the UTXO store and subscribing to +// block persisted notifications for coordination with the block persister service. +func (s *Server) Init(ctx context.Context) error { + s.ctx = ctx + + // Initialize metrics + initPrometheusMetrics() + + // Initialize cleanup service from UTXO store + cleanupProvider, ok := s.utxoStore.(cleanup.CleanupServiceProvider) + if !ok { + return errors.NewServiceError("UTXO store does not provide cleanup service") + } + + var err error + s.cleanupService, err = cleanupProvider.GetCleanupService() + if err != nil { + return errors.NewServiceError("failed to get cleanup service", err) + } + if s.cleanupService == nil { + return errors.NewServiceError("cleanup service not available from UTXO store") + } + + // Set persisted height getter for block persister coordination + s.cleanupService.SetPersistedHeightGetter(s.GetLastPersistedHeight) + + // Subscribe to BlockPersisted notifications using blockchain client + // The Subscribe method returns a channel, but we'll handle notifications in the polling worker + // For now, we'll track persisted height via blockchain state and notifications separately + subscriptionCh, err := s.blockchainClient.Subscribe(ctx, "Cleanup") + if err != nil { + return errors.NewServiceError("failed to subscribe to blockchain notifications", err) + } + + // Start a goroutine to handle BlockPersisted notifications + go func() { + for notification := range subscriptionCh { + if notification.Type == model.NotificationType_BlockPersisted { + if notification.Metadata != nil && notification.Metadata.Metadata != nil { + if heightStr, ok := notification.Metadata.Metadata["height"]; ok { + if height, err := strconv.ParseUint(heightStr, 10, 32); err == nil { + s.lastPersistedHeight.Store(uint32(height)) + s.logger.Debugf("Updated persisted height to %d", height) + } + } + } + } + } + }() + + // Read initial persisted height from blockchain state + if state, err := s.blockchainClient.GetState(ctx, "BlockPersisterHeight"); err == nil && len(state) >= 4 { + height := binary.LittleEndian.Uint32(state) + s.lastPersistedHeight.Store(height) + s.logger.Infof("Loaded initial block persister height: %d", height) + } + + return nil +} + +// Start begins the cleanup service operation. It starts the polling worker and cleanup +// processor goroutines, then starts the gRPC server. This function blocks until the +// server shuts down or encounters an error. +func (s *Server) Start(ctx context.Context, readyCh chan<- struct{}) error { + var closeOnce sync.Once + defer closeOnce.Do(func() { close(readyCh) }) + + // Wait for blockchain FSM to be ready + err := s.blockchainClient.WaitUntilFSMTransitionFromIdleState(ctx) + if err != nil { + return err + } + + // Initialize cleanup channel (buffer of 1 to prevent blocking while ensuring only one cleanup) + s.cleanupCh = make(chan uint32, 1) + + // Start the cleanup service (Aerospike or SQL) + if s.cleanupService != nil { + s.cleanupService.Start(ctx) + } + + // Start cleanup processor goroutine + go s.cleanupProcessor(ctx) + + // Start polling worker goroutine + go s.pollingWorker(ctx) + + // Start gRPC server (BLOCKING - must be last) + if err := util.StartGRPCServer(ctx, s.logger, s.settings, "cleanup", + s.settings.Cleanup.GRPCListenAddress, + func(server *grpc.Server) { + cleanup_api.RegisterCleanupAPIServer(server, s) + closeOnce.Do(func() { close(readyCh) }) + }, nil); err != nil { + return err + } + + return nil +} + +// Stop gracefully shuts down the cleanup service. Context cancellation will stop +// the polling worker and cleanup processor goroutines. +func (s *Server) Stop(ctx context.Context) error { + // Stop the cleanup service if it has a Stop method + if s.cleanupService != nil { + // Check if the cleanup service implements Stop + // Aerospike has Stop, SQL doesn't + type stopper interface { + Stop(ctx context.Context) error + } + if stoppable, ok := s.cleanupService.(stopper); ok { + if err := stoppable.Stop(ctx); err != nil { + s.logger.Errorf("Error stopping cleanup service: %v", err) + } + } + } + + // Context cancellation will stop goroutines + s.logger.Infof("Cleanup service stopped") + return nil +} + +// Health implements the health check for the cleanup service. When checkLiveness is true, +// it only checks if the service process is running. When false, it checks all dependencies +// including gRPC server, block assembly client, blockchain client, and UTXO store. +func (s *Server) Health(ctx context.Context, checkLiveness bool) (int, string, error) { + if checkLiveness { + // LIVENESS: Is the service process running? + return http.StatusOK, "OK", nil + } + + // READINESS: Can the service handle requests? + checks := make([]health.Check, 0, 5) + + // Check gRPC server is listening + if s.settings.Cleanup.GRPCListenAddress != "" { + checks = append(checks, health.Check{ + Name: "gRPC Server", + Check: health.CheckGRPCServerWithSettings(s.settings.Cleanup.GRPCListenAddress, s.settings, func(ctx context.Context, conn *grpc.ClientConn) error { + // Simple connection check - if we can create a client, server is up + return nil + }), + }) + } + + // Check block assembly client + if s.blockAssemblyClient != nil { + checks = append(checks, health.Check{ + Name: "BlockAssemblyClient", + Check: s.blockAssemblyClient.Health, + }) + } + + // Check blockchain client + if s.blockchainClient != nil { + checks = append(checks, health.Check{ + Name: "BlockchainClient", + Check: s.blockchainClient.Health, + }) + checks = append(checks, health.Check{ + Name: "FSM", + Check: blockchain.CheckFSM(s.blockchainClient), + }) + } + + // Check UTXO store + if s.utxoStore != nil { + checks = append(checks, health.Check{ + Name: "UTXOStore", + Check: s.utxoStore.Health, + }) + } + + return health.CheckAll(ctx, checkLiveness, checks) +} + +// HealthGRPC implements the gRPC health check endpoint. +func (s *Server) HealthGRPC(ctx context.Context, _ *cleanup_api.EmptyMessage) (*cleanup_api.HealthResponse, error) { + // Add context value to prevent circular dependency when checking gRPC server + ctx = context.WithValue(ctx, "skip-grpc-self-check", true) + + status, details, err := s.Health(ctx, false) + + return &cleanup_api.HealthResponse{ + Ok: status == http.StatusOK, + Details: details, + }, errors.WrapGRPC(err) +} + +// GetLastPersistedHeight returns the last known block height that has been persisted +// by the block persister service. This is used to coordinate cleanup operations to +// avoid deleting data that the block persister still needs. +func (s *Server) GetLastPersistedHeight() uint32 { + return s.lastPersistedHeight.Load() +} diff --git a/services/cleanup/cleanup_api/cleanup_api.pb.go b/services/cleanup/cleanup_api/cleanup_api.pb.go new file mode 100644 index 000000000..f81744754 --- /dev/null +++ b/services/cleanup/cleanup_api/cleanup_api.pb.go @@ -0,0 +1,175 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v6.33.0 +// source: services/cleanup/cleanup_api/cleanup_api.proto + +package cleanup_api + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type EmptyMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EmptyMessage) Reset() { + *x = EmptyMessage{} + mi := &file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EmptyMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EmptyMessage) ProtoMessage() {} + +func (x *EmptyMessage) ProtoReflect() protoreflect.Message { + mi := &file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EmptyMessage.ProtoReflect.Descriptor instead. +func (*EmptyMessage) Descriptor() ([]byte, []int) { + return file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescGZIP(), []int{0} +} + +type HealthResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` + Details string `protobuf:"bytes,2,opt,name=details,proto3" json:"details,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HealthResponse) Reset() { + *x = HealthResponse{} + mi := &file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HealthResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HealthResponse) ProtoMessage() {} + +func (x *HealthResponse) ProtoReflect() protoreflect.Message { + mi := &file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HealthResponse.ProtoReflect.Descriptor instead. +func (*HealthResponse) Descriptor() ([]byte, []int) { + return file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescGZIP(), []int{1} +} + +func (x *HealthResponse) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +func (x *HealthResponse) GetDetails() string { + if x != nil { + return x.Details + } + return "" +} + +var File_services_cleanup_cleanup_api_cleanup_api_proto protoreflect.FileDescriptor + +const file_services_cleanup_cleanup_api_cleanup_api_proto_rawDesc = "" + + "\n" + + ".services/cleanup/cleanup_api/cleanup_api.proto\x12\acleanup\"\x0e\n" + + "\fEmptyMessage\":\n" + + "\x0eHealthResponse\x12\x0e\n" + + "\x02ok\x18\x01 \x01(\bR\x02ok\x12\x18\n" + + "\adetails\x18\x02 \x01(\tR\adetails2L\n" + + "\n" + + "CleanupAPI\x12>\n" + + "\n" + + "HealthGRPC\x12\x15.cleanup.EmptyMessage\x1a\x17.cleanup.HealthResponse\"\x00BAZ?github.com/bsv-blockchain/teranode/services/cleanup/cleanup_apib\x06proto3" + +var ( + file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescOnce sync.Once + file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescData []byte +) + +func file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescGZIP() []byte { + file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescOnce.Do(func() { + file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_services_cleanup_cleanup_api_cleanup_api_proto_rawDesc), len(file_services_cleanup_cleanup_api_cleanup_api_proto_rawDesc))) + }) + return file_services_cleanup_cleanup_api_cleanup_api_proto_rawDescData +} + +var file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_services_cleanup_cleanup_api_cleanup_api_proto_goTypes = []any{ + (*EmptyMessage)(nil), // 0: cleanup.EmptyMessage + (*HealthResponse)(nil), // 1: cleanup.HealthResponse +} +var file_services_cleanup_cleanup_api_cleanup_api_proto_depIdxs = []int32{ + 0, // 0: cleanup.CleanupAPI.HealthGRPC:input_type -> cleanup.EmptyMessage + 1, // 1: cleanup.CleanupAPI.HealthGRPC:output_type -> cleanup.HealthResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_services_cleanup_cleanup_api_cleanup_api_proto_init() } +func file_services_cleanup_cleanup_api_cleanup_api_proto_init() { + if File_services_cleanup_cleanup_api_cleanup_api_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_services_cleanup_cleanup_api_cleanup_api_proto_rawDesc), len(file_services_cleanup_cleanup_api_cleanup_api_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_services_cleanup_cleanup_api_cleanup_api_proto_goTypes, + DependencyIndexes: file_services_cleanup_cleanup_api_cleanup_api_proto_depIdxs, + MessageInfos: file_services_cleanup_cleanup_api_cleanup_api_proto_msgTypes, + }.Build() + File_services_cleanup_cleanup_api_cleanup_api_proto = out.File + file_services_cleanup_cleanup_api_cleanup_api_proto_goTypes = nil + file_services_cleanup_cleanup_api_cleanup_api_proto_depIdxs = nil +} diff --git a/services/cleanup/cleanup_api/cleanup_api.proto b/services/cleanup/cleanup_api/cleanup_api.proto new file mode 100644 index 000000000..1c344f7dc --- /dev/null +++ b/services/cleanup/cleanup_api/cleanup_api.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package cleanup; + +option go_package = "github.com/bsv-blockchain/teranode/services/cleanup/cleanup_api"; + +service CleanupAPI { + rpc HealthGRPC(EmptyMessage) returns (HealthResponse) {} +} + +message EmptyMessage {} + +message HealthResponse { + bool ok = 1; + string details = 2; +} diff --git a/services/cleanup/cleanup_api/cleanup_api_grpc.pb.go b/services/cleanup/cleanup_api/cleanup_api_grpc.pb.go new file mode 100644 index 000000000..f42fd4715 --- /dev/null +++ b/services/cleanup/cleanup_api/cleanup_api_grpc.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.33.0 +// source: services/cleanup/cleanup_api/cleanup_api.proto + +package cleanup_api + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + CleanupAPI_HealthGRPC_FullMethodName = "/cleanup.CleanupAPI/HealthGRPC" +) + +// CleanupAPIClient is the client API for CleanupAPI service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CleanupAPIClient interface { + HealthGRPC(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (*HealthResponse, error) +} + +type cleanupAPIClient struct { + cc grpc.ClientConnInterface +} + +func NewCleanupAPIClient(cc grpc.ClientConnInterface) CleanupAPIClient { + return &cleanupAPIClient{cc} +} + +func (c *cleanupAPIClient) HealthGRPC(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (*HealthResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(HealthResponse) + err := c.cc.Invoke(ctx, CleanupAPI_HealthGRPC_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CleanupAPIServer is the server API for CleanupAPI service. +// All implementations must embed UnimplementedCleanupAPIServer +// for forward compatibility. +type CleanupAPIServer interface { + HealthGRPC(context.Context, *EmptyMessage) (*HealthResponse, error) + mustEmbedUnimplementedCleanupAPIServer() +} + +// UnimplementedCleanupAPIServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedCleanupAPIServer struct{} + +func (UnimplementedCleanupAPIServer) HealthGRPC(context.Context, *EmptyMessage) (*HealthResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method HealthGRPC not implemented") +} +func (UnimplementedCleanupAPIServer) mustEmbedUnimplementedCleanupAPIServer() {} +func (UnimplementedCleanupAPIServer) testEmbeddedByValue() {} + +// UnsafeCleanupAPIServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CleanupAPIServer will +// result in compilation errors. +type UnsafeCleanupAPIServer interface { + mustEmbedUnimplementedCleanupAPIServer() +} + +func RegisterCleanupAPIServer(s grpc.ServiceRegistrar, srv CleanupAPIServer) { + // If the following call pancis, it indicates UnimplementedCleanupAPIServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&CleanupAPI_ServiceDesc, srv) +} + +func _CleanupAPI_HealthGRPC_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EmptyMessage) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CleanupAPIServer).HealthGRPC(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CleanupAPI_HealthGRPC_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CleanupAPIServer).HealthGRPC(ctx, req.(*EmptyMessage)) + } + return interceptor(ctx, in, info, handler) +} + +// CleanupAPI_ServiceDesc is the grpc.ServiceDesc for CleanupAPI service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CleanupAPI_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "cleanup.CleanupAPI", + HandlerType: (*CleanupAPIServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "HealthGRPC", + Handler: _CleanupAPI_HealthGRPC_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "services/cleanup/cleanup_api/cleanup_api.proto", +} diff --git a/services/cleanup/metrics.go b/services/cleanup/metrics.go new file mode 100644 index 000000000..10b5004b6 --- /dev/null +++ b/services/cleanup/metrics.go @@ -0,0 +1,45 @@ +package cleanup + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + cleanupDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "cleanup_duration_seconds", + Help: "Duration of cleanup operations in seconds", + Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1s to ~17 minutes + }, + []string{"operation"}, // "preserve_parents" or "dah_cleanup" + ) + + cleanupSkipped = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "cleanup_skipped_total", + Help: "Number of cleanup operations skipped", + }, + []string{"reason"}, // "not_running", "no_new_height", "already_in_progress" + ) + + cleanupProcessed = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "cleanup_processed_total", + Help: "Total number of successful cleanup operations", + }, + ) + + cleanupErrors = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "cleanup_errors_total", + Help: "Total number of cleanup errors", + }, + []string{"operation"}, // "preserve_parents", "dah_cleanup", "poll" + ) +) + +func initPrometheusMetrics() { + // Metrics are auto-registered via promauto + // This function exists for consistency with other services +} diff --git a/settings.conf b/settings.conf index 4fdf0649c..11eed49de 100644 --- a/settings.conf +++ b/settings.conf @@ -121,6 +121,7 @@ BLOCK_ASSEMBLY_GRPC_PORT = 8085 BLOCK_PERSISTER_HTTP_PORT = 8083 BLOCK_VALIDATION_GRPC_PORT = 8088 CENTRIFUGE_PORT = 8892 +CLEANUP_GRPC_PORT = 8096 COINBASE_GRPC_PORT = 8093 FAUCET_HTTP_PORT = 8097 HEALTH_CHECK_PORT = 8000 @@ -168,6 +169,7 @@ aerospike_host.docker.ci.teranode3 = aerospike-3 aerospike_batchPolicy = aerospike:///?MaxRetries=5&SleepBetweenRetries=500ms&SleepMultiplier=1&TotalTimeout=64s&SocketTimeout=10s&ConcurrentNodes=0 aerospike_readPolicy = aerospike:///?MaxRetries=5&SleepBetweenRetries=500ms&SleepMultiplier=1&TotalTimeout=1s&SocketTimeout=1s aerospike_writePolicy = aerospike:///?MaxRetries=5&SleepBetweenRetries=500ms&SleepMultiplier=1&TotalTimeout=1s&SocketTimeout=1s +aerospike_queryPolicy = aerospike:///?MaxRetries=3&SleepBetweenRetries=500ms&SleepMultiplier=1&TotalTimeout=30m&SocketTimeout=25m # @endgroup aerospike_port.docker.teranode1 = 3100 @@ -549,6 +551,26 @@ coinbase_wallet_private_key.docker.host.teranode1 = ${PK1} coinbase_wallet_private_key.docker.host.teranode2 = ${PK2} coinbase_wallet_private_key.docker.host.teranode3 = ${PK3} +# Cleanup Service Configuration +# ------------------------------ +cleanup_grpcAddress = localhost:${CLEANUP_GRPC_PORT} +cleanup_grpcAddress.docker.m = cleanup:${CLEANUP_GRPC_PORT} +cleanup_grpcAddress.docker = ${clientName}:${CLEANUP_GRPC_PORT} +cleanup_grpcAddress.docker.host = localhost:${PORT_PREFIX}${CLEANUP_GRPC_PORT} +cleanup_grpcAddress.operator = k8s:///cleanup.${clientName}.svc.cluster.local:${CLEANUP_GRPC_PORT} + +cleanup_grpcListenAddress = :${CLEANUP_GRPC_PORT} +cleanup_grpcListenAddress.dev = localhost:${CLEANUP_GRPC_PORT} +cleanup_grpcListenAddress.docker.host = localhost:${PORT_PREFIX}${CLEANUP_GRPC_PORT} + +cleanup_pollingInterval = 1m + +# Timeout for waiting for cleanup job completion before coordinator moves on +# Large cleanups (multi-million rows) may exceed this timeout, which is normal +# The cleanup continues in background and will be re-queued immediately +# Default: 10m +cleanup_jobTimeout = 10m + # @group: dashboard # Vite dev server ports (comma-separated) # dashboard_devServerPorts = 5173,4173 @@ -1098,6 +1120,20 @@ startBlockPersister.dev.system.test = false startBlockPersister.operator = false startBlockPersister.teratestnet = true +# Cleanup Service Configuration +# --------------------------------------- +startCleanup = true +startCleanup.docker.host.teranode1.coinbase = false +startCleanup.docker.host.teranode2.coinbase = false +startCleanup.docker.host.teranode3.coinbase = false +startCleanup.docker.teranode1.test.coinbase = false +startCleanup.docker.teranode2.test.coinbase = false +startCleanup.docker.teranode3.test.coinbase = false +startCleanup.docker.m = false +startCleanup.operator = false +startCleanup.dev = true +startCleanup.teratestnet = true + # Block Validation Service Configuration # --------------------------------------- startBlockValidation = true @@ -1453,6 +1489,19 @@ utxostore_storeBatcherDurationMillis = 10 utxostore_storeBatcherSize = 2048 +# Cleanup batcher settings - optimized for multi-million record cleanups +# These settings control batching of operations during delete-at-height (DAH) cleanup +# Larger batch sizes = fewer round-trips to Aerospike = faster cleanup +utxostore_cleanupParentUpdateBatcherSize = 2000 +utxostore_cleanupParentUpdateBatcherDurationMillis = 100 +utxostore_cleanupDeleteBatcherSize = 5000 +utxostore_cleanupDeleteBatcherDurationMillis = 100 + +# Maximum concurrent operations during cleanup (0 = use Aerospike connection queue size) +# Increase this if you have a large connection pool and want faster cleanup +# Default: 0 (auto-detect from connection pool) +utxostore_cleanupMaxConcurrentOperations = 0 + # this value determines if a caching mechanism will be used for external transactions # if you have the memory available, it will speed up your IBD # and it is required for large blocks which load in the same tx multiple times, e.g. 814337 @@ -1517,4 +1566,4 @@ COINBASE_GRPC_ADDRESS = COINBASE_HTTP_PORT = COINBASE_WALLET_PRIVATE_KEY.docker.teranode1 = COINBASE_WALLET_PRIVATE_KEY.docker.teranode2 = -COINBASE_WALLET_PRIVATE_KEY.docker.teranode3 = \ No newline at end of file +COINBASE_WALLET_PRIVATE_KEY.docker.teranode3 = diff --git a/settings/interface.go b/settings/interface.go index e5a0a6c62..0e7d68132 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -67,6 +67,7 @@ type Settings struct { RPC RPCSettings Faucet FaucetSettings Dashboard DashboardSettings + Cleanup CleanupSettings GlobalBlockHeightRetention uint32 } @@ -139,6 +140,7 @@ type AerospikeSettings struct { BatchPolicyURL *url.URL ReadPolicyURL *url.URL WritePolicyURL *url.URL + QueryPolicyURL *url.URL Port int UseDefaultBasePolicies bool UseDefaultPolicies bool @@ -455,6 +457,14 @@ type CoinbaseSettings struct { P2PPort int } +type CleanupSettings struct { + GRPCListenAddress string + GRPCAddress string + PollingInterval time.Duration + WorkerCount int + JobTimeout time.Duration // Timeout for waiting for cleanup job completion +} + type SubtreeValidationSettings struct { QuorumPath string QuorumAbsoluteTimeout time.Duration diff --git a/settings/settings.go b/settings/settings.go index 56f73019c..9b88979a4 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -136,6 +136,7 @@ func NewSettings(alternativeContext ...string) *Settings { BatchPolicyURL: getURL("aerospike_batchPolicy", "defaultBatchPolicy", alternativeContext...), ReadPolicyURL: getURL("aerospike_readPolicy", "defaultReadPolicy", alternativeContext...), WritePolicyURL: getURL("aerospike_writePolicy", "defaultWritePolicy", alternativeContext...), + QueryPolicyURL: getURL("aerospike_queryPolicy", "defaultQueryPolicy", alternativeContext...), Port: getInt("aerospike_port", 3000, alternativeContext...), UseDefaultBasePolicies: getBool("aerospike_useDefaultBasePolicies", false, alternativeContext...), UseDefaultPolicies: getBool("aerospike_useDefaultPolicies", false, alternativeContext...), @@ -352,11 +353,11 @@ func NewSettings(alternativeContext ...string) *Settings { MaxMinedBatchSize: getInt("utxostore_maxMinedBatchSize", 1024, alternativeContext...), BlockHeightRetentionAdjustment: getInt32("utxostore_blockHeightRetentionAdjustment", 0, alternativeContext...), DisableDAHCleaner: getBool("utxostore_disableDAHCleaner", false, alternativeContext...), - // Cleanup-specific settings with reasonable defaults - CleanupParentUpdateBatcherSize: getInt("utxostore_cleanupParentUpdateBatcherSize", 100, alternativeContext...), - CleanupParentUpdateBatcherDurationMillis: getInt("utxostore_cleanupParentUpdateBatcherDurationMillis", 10, alternativeContext...), - CleanupDeleteBatcherSize: getInt("utxostore_cleanupDeleteBatcherSize", 256, alternativeContext...), - CleanupDeleteBatcherDurationMillis: getInt("utxostore_cleanupDeleteBatcherDurationMillis", 10, alternativeContext...), + // Cleanup-specific settings optimized for multi-million record cleanups + CleanupParentUpdateBatcherSize: getInt("utxostore_cleanupParentUpdateBatcherSize", 2000, alternativeContext...), + CleanupParentUpdateBatcherDurationMillis: getInt("utxostore_cleanupParentUpdateBatcherDurationMillis", 100, alternativeContext...), + CleanupDeleteBatcherSize: getInt("utxostore_cleanupDeleteBatcherSize", 5000, alternativeContext...), + CleanupDeleteBatcherDurationMillis: getInt("utxostore_cleanupDeleteBatcherDurationMillis", 100, alternativeContext...), CleanupMaxConcurrentOperations: getInt("utxostore_cleanupMaxConcurrentOperations", 0, alternativeContext...), }, P2P: P2PSettings{ @@ -412,6 +413,13 @@ func NewSettings(alternativeContext ...string) *Settings { TestMode: getBool("coinbase_test_mode", false, alternativeContext...), P2PPort: getInt("p2p_port_coinbase", 9906, alternativeContext...), }, + Cleanup: CleanupSettings{ + GRPCAddress: getString("cleanup_grpcAddress", "localhost:8096", alternativeContext...), + GRPCListenAddress: getString("cleanup_grpcListenAddress", ":8096", alternativeContext...), + PollingInterval: getDuration("cleanup_pollingInterval", time.Minute, alternativeContext...), + WorkerCount: getInt("cleanup_workerCount", 4, alternativeContext...), // Default to 4 workers + JobTimeout: getDuration("cleanup_jobTimeout", 10*time.Minute, alternativeContext...), + }, SubtreeValidation: SubtreeValidationSettings{ QuorumAbsoluteTimeout: getDuration("subtree_quorum_absolute_timeout", 30*time.Second, alternativeContext...), QuorumPath: getString("subtree_quorum_path", "", alternativeContext...), diff --git a/stores/cleanup/job_processor.go b/stores/cleanup/job_processor.go index acc36b162..68553843f 100644 --- a/stores/cleanup/job_processor.go +++ b/stores/cleanup/job_processor.go @@ -125,6 +125,24 @@ func (m *JobManager) Stop() { } m.mu.Unlock() + // Signal any running jobs before waiting for workers + m.jobsMutex.Lock() + for _, job := range m.jobs { + status := job.GetStatus() + if status == JobStatusRunning || status == JobStatusPending { + // Mark as cancelled and signal the channel + job.SetStatus(JobStatusCancelled) + job.Ended = time.Now() + + // Signal the doneCh so waiting goroutines aren't blocked + m.sendAndClose(job.DoneCh, JobStatusCancelled.String()) + + // Cancel the job's context if it has one + job.Cancel() + } + } + m.jobsMutex.Unlock() + // Wait for all workers to exit m.wg.Wait() } diff --git a/stores/utxo/aerospike/aerospike.go b/stores/utxo/aerospike/aerospike.go index 63250e158..9742b7e86 100644 --- a/stores/utxo/aerospike/aerospike.go +++ b/stores/utxo/aerospike/aerospike.go @@ -788,8 +788,8 @@ func (s *Store) ProcessExpiredPreservations(ctx context.Context, currentHeight u queryPolicy := aerospike.NewQueryPolicy() queryPolicy.MaxRetries = 3 - queryPolicy.SocketTimeout = 30 * time.Second - queryPolicy.TotalTimeout = 120 * time.Second + queryPolicy.SocketTimeout = 5 * time.Minute + queryPolicy.TotalTimeout = 30 * time.Minute recordset, err := s.client.Query(queryPolicy, stmt) if err != nil { diff --git a/stores/utxo/aerospike/cleanup/cleanup_service.go b/stores/utxo/aerospike/cleanup/cleanup_service.go index c3c8b2183..30adb0392 100644 --- a/stores/utxo/aerospike/cleanup/cleanup_service.go +++ b/stores/utxo/aerospike/cleanup/cleanup_service.go @@ -101,6 +101,7 @@ type Service struct { queryPolicy *aerospike.QueryPolicy writePolicy *aerospike.WritePolicy batchWritePolicy *aerospike.BatchWritePolicy + batchPolicy *aerospike.BatchPolicy // separate batchers for background batch processing parentUpdateBatcher batcherIfc[batchParentUpdate] @@ -164,20 +165,20 @@ func NewService(tSettings *settings.Settings, opts Options) (*Service, error) { }) }) - queryPolicy := aerospike.NewQueryPolicy() - queryPolicy.IncludeBinData = true - queryPolicy.MaxRetries = 3 - queryPolicy.SocketTimeout = 30 * time.Second - queryPolicy.TotalTimeout = 120 * time.Second + // Use the configured query policy from settings (configured via aerospike_queryPolicy URL) + queryPolicy := util.GetAerospikeQueryPolicy(tSettings) + queryPolicy.IncludeBinData = true // Need to include bin data for cleanup processing - writePolicy := aerospike.NewWritePolicy(0, 0) - writePolicy.MaxRetries = 3 - writePolicy.SocketTimeout = 30 * time.Second - writePolicy.TotalTimeout = 120 * time.Second + // Use the configured write policy from settings + writePolicy := util.GetAerospikeWritePolicy(tSettings, 0) - batchWritePolicy := aerospike.NewBatchWritePolicy() + // Use the configured batch policies from settings + batchWritePolicy := util.GetAerospikeBatchWritePolicy(tSettings) batchWritePolicy.RecordExistsAction = aerospike.UPDATE_ONLY + // Use the configured batch policy from settings (configured via aerospike_batchPolicy URL) + batchPolicy := util.GetAerospikeBatchPolicy(tSettings) + // Determine max concurrent operations: // - Use connection queue size as the upper bound (to prevent connection exhaustion) // - If setting is configured (non-zero), use the minimum of setting and connection queue size @@ -202,6 +203,7 @@ func NewService(tSettings *settings.Settings, opts Options) (*Service, error) { queryPolicy: queryPolicy, writePolicy: writePolicy, batchWritePolicy: batchWritePolicy, + batchPolicy: batchPolicy, getPersistedHeight: opts.GetPersistedHeight, maxConcurrentOperations: maxConcurrentOps, } @@ -304,6 +306,9 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { job.SetStatus(cleanup.JobStatusRunning) job.Started = time.Now() + // Get the job's context for cancellation support + jobCtx := job.Context() + s.logger.Infof("Worker %d starting cleanup job for block height %d", workerID, job.BlockHeight) // BLOCK PERSISTER COORDINATION: Calculate safe cleanup height @@ -378,11 +383,30 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { result := recordset.Results() recordCount := atomic.Int64{} + lastProgressLog := atomic.Int64{} + lastProgressLog.Store(time.Now().Unix()) + progressLogInterval := int64(30) // Log progress every 30 seconds g := &errgroup.Group{} util.SafeSetLimit(g, s.maxConcurrentOperations) + // Log initial start + s.logger.Infof("Worker %d: starting cleanup scan for height %d (delete_at_height <= %d)", + workerID, job.BlockHeight, safeCleanupHeight) + for { + // Check for cancellation before processing next record + select { + case <-jobCtx.Done(): + s.logger.Infof("Worker %d: cleanup job for height %d cancelled, stopping record processing", workerID, job.BlockHeight) + // Close the recordset to stop receiving more records + recordset.Close() + // Break out of the loop to allow graceful shutdown + goto waitForInProgress + default: + // Continue processing + } + rec, ok := <-result if !ok || rec == nil { break // No more records @@ -394,32 +418,88 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { return err } - recordCount.Add(1) + count := recordCount.Add(1) + + // Log progress every N records or every X seconds (thread-safe) + now := time.Now().Unix() + lastLog := lastProgressLog.Load() + if count%10000 == 0 || (now-lastLog) > progressLogInterval { + // Try to update last log time - only one goroutine will succeed + if lastProgressLog.CompareAndSwap(lastLog, now) { + s.logger.Infof("Worker %d: cleanup progress for height %d - processed %d records so far", + workerID, job.BlockHeight, count) + } + } return nil }) } - if err := g.Wait(); err != nil { - s.logger.Errorf(err.Error()) - s.markJobAsFailed(job, err) - return +waitForInProgress: + // Wait for all in-progress operations to complete (even during cancellation) + // IMPORTANT: Capture the error but DON'T return immediately - we need to flush batchers! + processingErr := g.Wait() + if processingErr != nil { + s.logger.Errorf("Worker %d: error during cleanup processing for height %d: %v", workerID, job.BlockHeight, processingErr) + } + + // Check if we were cancelled + wasCancelled := false + select { + case <-jobCtx.Done(): + wasCancelled = true + s.logger.Infof("Worker %d: cleanup job for height %d was cancelled", workerID, job.BlockHeight) + default: + // Not cancelled } s.logger.Infof("Worker %d: processed %d records for cleanup job %d", workerID, recordCount.Load(), job.BlockHeight) - job.SetStatus(cleanup.JobStatusCompleted) - job.Ended = time.Now() + // CRITICAL: Always flush batchers, even on error! + // This ensures that work already queued gets committed to the database. + // Without this, timeouts or errors would cause data loss of already processed records. + s.logger.Debugf("Worker %d: flushing batchers for cleanup job %d (error: %v)", workerID, job.BlockHeight, processingErr != nil) - prometheusUtxoCleanupBatch.Observe(float64(time.Since(job.Started).Microseconds()) / 1_000_000) + // Flush delete batcher to ensure all pending deletions are executed + if s.deleteBatcher != nil { + s.deleteBatcher.Trigger() + } + + // Flush parent update batcher to ensure all pending parent updates are executed + if s.parentUpdateBatcher != nil { + s.parentUpdateBatcher.Trigger() + } + + s.logger.Debugf("Worker %d: batchers flushed for cleanup job %d", workerID, job.BlockHeight) - if job.DoneCh != nil { - job.DoneCh <- cleanup.JobStatusCompleted.String() - close(job.DoneCh) + // Set appropriate status based on errors and cancellation + if processingErr != nil { + // Had an error during processing, but batchers were still flushed + job.SetStatus(cleanup.JobStatusFailed) + job.Error = processingErr + s.logger.Warnf("Worker %d: cleanup job for height %d failed with error, but %d records were still committed to database", + workerID, job.BlockHeight, recordCount.Load()) + } else if wasCancelled { + job.SetStatus(cleanup.JobStatusCancelled) + s.logger.Infof("Worker %d: cleanup job for height %d marked as cancelled after graceful shutdown", workerID, job.BlockHeight) + } else { + job.SetStatus(cleanup.JobStatusCompleted) } + job.Ended = time.Now() + + prometheusUtxoCleanupBatch.Observe(float64(time.Since(job.Started).Microseconds()) / 1_000_000) finalRecordCount := recordCount.Load() - s.logger.Infof("Worker %d completed cleanup job for block height %d in %v, processed %d records", workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount) + if processingErr != nil { + s.logger.Infof("Worker %d failed cleanup job for block height %d after %v, but successfully committed %d records to database before error", + workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount) + } else if wasCancelled { + s.logger.Infof("Worker %d cancelled cleanup job for block height %d after %v, processed %d records before cancellation", + workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount) + } else { + s.logger.Infof("Worker %d completed cleanup job for block height %d in %v, processed %d records", + workerID, job.BlockHeight, job.Ended.Sub(job.Started), finalRecordCount) + } } func (s *Service) getTxInputsFromBins(job *cleanup.Job, workerID int, bins aerospike.BinMap, txHash *chainhash.Hash) ([]*bt.Input, error) { @@ -431,31 +511,45 @@ func (s *Service) getTxInputsFromBins(job *cleanup.Job, workerID int, bins aeros txBytes, err := s.external.Get(s.ctx, txHash.CloneBytes(), fileformat.FileTypeTx) if err != nil { if errors.Is(err, errors.ErrNotFound) { + // Check if outputs exist (sometimes only outputs are stored) exists, err := s.external.Exists(s.ctx, txHash.CloneBytes(), fileformat.FileTypeOutputs) if err != nil { - return nil, errors.NewProcessingError("Worker %d: error checking existence of outputs for external tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: error checking existence of outputs for external tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) } if exists { - // nothing more to do here, continue processing other records + // Only outputs exist, no inputs needed for cleanup return nil, nil } - return nil, errors.NewProcessingError("Worker %d: external tx %s not found in external store for cleanup job %d", workerID, txHash.String(), job.BlockHeight) + // External blob already deleted (by LocalDAH or previous cleanup), just need to delete Aerospike record + s.logger.Debugf("Worker %d: external tx %s already deleted from blob store for cleanup job %d, proceeding to delete Aerospike record", + workerID, txHash.String(), job.BlockHeight) + return []*bt.Input{}, nil } + // Other errors should still be reported + return nil, errors.NewProcessingError("Worker %d: error getting external tx %s for cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) } tx, err := bt.NewTxFromBytes(txBytes) if err != nil { - return nil, errors.NewProcessingError("Worker %d: invalid tx bytes for external tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: invalid tx bytes for external tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) } inputs = tx.Inputs } else { // get the inputs from the record directly - inputInterfaces, ok := bins[fields.Inputs.String()].([]interface{}) + inputsValue := bins[fields.Inputs.String()] + if inputsValue == nil { + // Inputs field might be nil for certain records (e.g., coinbase) + return []*bt.Input{}, nil + } + + inputInterfaces, ok := inputsValue.([]interface{}) if !ok { - return nil, errors.NewProcessingError("Worker %d: missing inputs for record in cleanup job %d", workerID, job.BlockHeight) + // Log more helpful error with actual type + return nil, errors.NewProcessingError("Worker %d: inputs field has unexpected type %T (expected []interface{}) for record in cleanup job %d", + workerID, inputsValue, job.BlockHeight) } inputs = make([]*bt.Input, len(inputInterfaces)) @@ -465,7 +559,7 @@ func (s *Service) getTxInputsFromBins(job *cleanup.Job, workerID int, bins aeros inputs[i] = &bt.Input{} if _, err := inputs[i].ReadFrom(bytes.NewReader(input)); err != nil { - return nil, errors.NewProcessingError("Worker %d: invalid input for record in cleanup job %d: %v", workerID, job.BlockHeight, err) + return nil, errors.NewProcessingError("Worker %d: invalid input for record in cleanup job %d", workerID, job.BlockHeight, err) } } } @@ -477,11 +571,6 @@ func (s *Service) markJobAsFailed(job *cleanup.Job, err error) { job.SetStatus(cleanup.JobStatusFailed) job.Error = err job.Ended = time.Now() - - if job.DoneCh != nil { - job.DoneCh <- cleanup.JobStatusFailed.String() - close(job.DoneCh) - } } // sendParentUpdateBatch processes a batch of parent update operations @@ -514,7 +603,7 @@ func (s *Service) sendParentUpdateBatch(batch []*batchParentUpdate) { parentKey, err := aerospike.NewKey(s.namespace, s.set, keySource) if err != nil { // Send error to this batch item immediately - item.errCh <- errors.NewProcessingError("error creating parent key for input in external tx %s: %v", item.txHash.String(), err) + item.errCh <- errors.NewProcessingError("error creating parent key for input in external tx %s", item.txHash.String(), err) continue } @@ -582,13 +671,13 @@ func (s *Service) sendParentUpdateBatch(batch []*batchParentUpdate) { parentsList = append(parentsList, parent) } - // Execute the batch operation - err := s.client.BatchOperate(s.client.DefaultBatchPolicy, batchRecords) + // Execute the batch operation with custom batch policy + err := s.client.BatchOperate(s.batchPolicy, batchRecords) if err != nil { // Send error to all batch items that have parent updates for _, item := range batch { if len(itemToParents[item]) > 0 { - item.errCh <- errors.NewProcessingError("error batch updating parent records: %v", err) + item.errCh <- errors.NewProcessingError("error batch updating parent records", err) } } return @@ -602,7 +691,7 @@ func (s *Service) sendParentUpdateBatch(batch []*batchParentUpdate) { if batchRec.BatchRec().Err != nil { if !errors.Is(batchRec.BatchRec().Err, aerospike.ErrKeyNotFound) { // Real error occurred - itemErr = errors.NewProcessingError("error updating parent record: %v", batchRec.BatchRec().Err) + itemErr = errors.NewProcessingError("error updating parent record", batchRec.BatchRec().Err) } // For ErrKeyNotFound, itemErr remains nil (success) } @@ -620,6 +709,9 @@ func (s *Service) sendDeleteBatch(batch []*batchDelete) { return } + // Log batch deletion for verification + // s.logger.Debugf("Sending delete batch of %d records to Aerospike", len(batch)) + // Create batch delete records batchRecords := make([]aerospike.BatchRecordIfc, 0, len(batch)) batchDeletePolicy := aerospike.NewBatchDeletePolicy() @@ -628,32 +720,43 @@ func (s *Service) sendDeleteBatch(batch []*batchDelete) { batchRecords = append(batchRecords, aerospike.NewBatchDelete(batchDeletePolicy, item.key)) } - // Execute the batch delete operation - err := s.client.BatchOperate(s.client.DefaultBatchPolicy, batchRecords) + // Execute the batch delete operation with custom batch policy + err := s.client.BatchOperate(s.batchPolicy, batchRecords) if err != nil { // Send error to all batch items for _, item := range batch { - item.errCh <- errors.NewProcessingError("error batch deleting records: %v", err) + item.errCh <- errors.NewProcessingError("error batch deleting records", err) } return } // Check individual batch results and send individual responses + successCount := 0 + alreadyDeletedCount := 0 + errorCount := 0 + for i, batchRec := range batchRecords { if batchRec.BatchRec().Err != nil { if errors.Is(batchRec.BatchRec().Err, aerospike.ErrKeyNotFound) { // Record not found, treat as success (already deleted) batch[i].errCh <- nil + alreadyDeletedCount++ } else { // Real error occurred - batch[i].errCh <- errors.NewProcessingError("error deleting record for tx %s: %v", batch[i].txHash.String(), batchRec.BatchRec().Err) + batch[i].errCh <- errors.NewProcessingError("error deleting record for tx %s", batch[i].txHash.String(), batchRec.BatchRec().Err) + errorCount++ } } else { // Success batch[i].errCh <- nil + successCount++ } } + + // Log batch results for verification + // s.logger.Debugf("Delete batch completed: %d successful, %d already deleted, %d errors (total: %d)", + // successCount, alreadyDeletedCount, errorCount, len(batch)) } func (s *Service) ProcessSingleRecord(txid *chainhash.Hash, inputs []*bt.Input) error { @@ -671,7 +774,7 @@ func (s *Service) ProcessSingleRecord(txid *chainhash.Hash, inputs []*bt.Input) // processRecordCleanup processes a single record for cleanup using batchers func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aerospike.Result) error { if rec.Err != nil { - return errors.NewProcessingError("Worker %d: error reading record for cleanup job %d: %v", workerID, job.BlockHeight, rec.Err) + return errors.NewProcessingError("Worker %d: error reading record for cleanup job %d", workerID, job.BlockHeight, rec.Err) } // get all the unique parent records of the record being deleted @@ -709,7 +812,7 @@ func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aero // Wait for parent update to complete if err = <-parentErrCh; err != nil { - return errors.NewProcessingError("Worker %d: error updating parents for tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) + return errors.NewProcessingError("Worker %d: error updating parents for tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) } } @@ -724,7 +827,7 @@ func (s *Service) processRecordCleanup(job *cleanup.Job, workerID int, rec *aero // Wait for deletion to complete if err = <-deleteErrCh; err != nil { - return errors.NewProcessingError("Worker %d: error deleting record for tx %s in cleanup job %d: %v", workerID, txHash.String(), job.BlockHeight, err) + return errors.NewProcessingError("Worker %d: error deleting record for tx %s in cleanup job %d", workerID, txHash.String(), job.BlockHeight, err) } // Wait for all operations to complete diff --git a/stores/utxo/aerospike/cleanup_provider.go b/stores/utxo/aerospike/cleanup_provider.go index 06785152f..83610b35f 100644 --- a/stores/utxo/aerospike/cleanup_provider.go +++ b/stores/utxo/aerospike/cleanup_provider.go @@ -65,6 +65,7 @@ func (s *Store) GetCleanupService() (cleanup.Service, error) { Namespace: s.namespace, Set: s.setName, IndexWaiter: s, + WorkerCount: s.settings.Cleanup.WorkerCount, } // Create a new cleanup service diff --git a/stores/utxo/aerospike/unmined_iterator.go b/stores/utxo/aerospike/unmined_iterator.go index c928da1d7..5d210c3a9 100644 --- a/stores/utxo/aerospike/unmined_iterator.go +++ b/stores/utxo/aerospike/unmined_iterator.go @@ -11,6 +11,7 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/stores/utxo" "github.com/bsv-blockchain/teranode/stores/utxo/fields" + "github.com/bsv-blockchain/teranode/util" ) // unminedTxIterator implements utxo.UnminedTxIterator for Aerospike @@ -64,8 +65,7 @@ func newUnminedTxIterator(store *Store, fullScan bool) (*unminedTxIterator, erro fields.IsCoinbase.String(), } - policy := as.NewQueryPolicy() - policy.MaxRetries = 1 + policy := util.GetAerospikeQueryPolicy(store.settings) policy.IncludeBinData = true store.logger.Infof("[newUnminedTxIterator] Starting Aerospike query for unmined transactions (fullScan=%t)", fullScan) diff --git a/stores/utxo/sql/cleanup/cleanup_service.go b/stores/utxo/sql/cleanup/cleanup_service.go index 79638eb3b..99f12ff29 100644 --- a/stores/utxo/sql/cleanup/cleanup_service.go +++ b/stores/utxo/sql/cleanup/cleanup_service.go @@ -178,8 +178,12 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { } } + // Log start of cleanup + s.logger.Infof("[SQLCleanupService %d] starting cleanup scan for height %d (delete_at_height <= %d)", + workerID, job.BlockHeight, safeCleanupHeight) + // Execute the cleanup with safe height - err := deleteTombstoned(s.db, safeCleanupHeight) + deletedCount, err := deleteTombstonedWithCount(s.db, safeCleanupHeight) if err != nil { job.SetStatus(cleanup.JobStatusFailed) @@ -187,23 +191,30 @@ func (s *Service) processCleanupJob(job *cleanup.Job, workerID int) { job.Ended = time.Now() s.logger.Errorf("[SQLCleanupService %d] cleanup job failed for block height %d: %v", workerID, job.BlockHeight, err) - - if job.DoneCh != nil { - job.DoneCh <- cleanup.JobStatusFailed.String() - close(job.DoneCh) - } } else { job.SetStatus(cleanup.JobStatusCompleted) job.Ended = time.Now() - s.logger.Debugf("[SQLCleanupService %d] cleanup job completed for block height %d in %v", - workerID, job.BlockHeight, time.Since(job.Started)) + s.logger.Infof("[SQLCleanupService %d] cleanup job completed for block height %d in %v - deleted %d records", + workerID, job.BlockHeight, time.Since(job.Started), deletedCount) + } +} - if job.DoneCh != nil { - job.DoneCh <- cleanup.JobStatusCompleted.String() - close(job.DoneCh) - } +// deleteTombstonedWithCount removes transactions that have passed their expiration time and returns the count. +func deleteTombstonedWithCount(db *usql.DB, blockHeight uint32) (int64, error) { + // Delete transactions that have passed their expiration time + // this will cascade to inputs, outputs, block_ids and conflicting_children + result, err := db.Exec("DELETE FROM transactions WHERE delete_at_height <= $1", blockHeight) + if err != nil { + return 0, errors.NewStorageError("failed to delete transactions", err) + } + + count, err := result.RowsAffected() + if err != nil { + return 0, errors.NewStorageError("failed to get rows affected", err) } + + return count, nil } // deleteTombstoned removes transactions that have passed their expiration time. diff --git a/util/aerospike.go b/util/aerospike.go index a12ff4f5f..b0b010e0e 100644 --- a/util/aerospike.go +++ b/util/aerospike.go @@ -45,6 +45,12 @@ var batchSleepBetweenRetries time.Duration var batchSleepMultiplier float64 var concurrentNodes int +var queryMaxRetries int +var queryTotalTimeout time.Duration +var querySocketTimeout time.Duration +var querySleepBetweenRetries time.Duration +var querySleepMultiplier float64 + var aerospikePrometheusMetrics = *safemap.New[string, prometheus.Counter]() var aerospikePrometheusHistograms = *safemap.New[string, prometheus.Histogram]() @@ -212,6 +218,44 @@ func getAerospikeClient(logger ulogger.Logger, url *url.URL, tSettings *settings return nil, err } + // Query policy stuff + queryPolicyURL := tSettings.Aerospike.QueryPolicyURL + if queryPolicyURL == nil { + // If no query policy is set, use default values + queryMaxRetries = aerospike.NewQueryPolicy().MaxRetries + queryTotalTimeout = aerospike.NewQueryPolicy().TotalTimeout + querySocketTimeout = aerospike.NewQueryPolicy().SocketTimeout + querySleepBetweenRetries = aerospike.NewQueryPolicy().SleepBetweenRetries + querySleepMultiplier = aerospike.NewQueryPolicy().SleepMultiplier + } else { + logger.Infof("[Aerospike] queryPolicy url %s", queryPolicyURL) + + queryMaxRetries, err = getQueryInt(queryPolicyURL, "MaxRetries", aerospike.NewQueryPolicy().MaxRetries, logger) + if err != nil { + return nil, err + } + + queryTotalTimeout, err = getQueryDuration(queryPolicyURL, "TotalTimeout", aerospike.NewQueryPolicy().TotalTimeout, logger) + if err != nil { + return nil, err + } + + querySocketTimeout, err = getQueryDuration(queryPolicyURL, "SocketTimeout", aerospike.NewQueryPolicy().SocketTimeout, logger) + if err != nil { + return nil, err + } + + querySleepBetweenRetries, err = getQueryDuration(queryPolicyURL, "SleepBetweenRetries", aerospike.NewQueryPolicy().SleepBetweenRetries, logger) + if err != nil { + return nil, err + } + + querySleepMultiplier, err = getQueryFloat64(queryPolicyURL, "SleepMultiplier", aerospike.NewQueryPolicy().SleepMultiplier, logger) + if err != nil { + return nil, err + } + } + // todo optimize these https://github.com/aerospike/aerospike-client-go/issues/256#issuecomment-479964112 // todo optimize read policies // todo optimize write policies @@ -717,3 +761,21 @@ func GetAerospikeBatchReadPolicy(tSettings *settings.Settings) *aerospike.BatchR return batchReadPolicy } + +// GetAerospikeQueryPolicy creates a new Aerospike query policy with configured settings. +// Used for query operations to scan and filter records. +func GetAerospikeQueryPolicy(tSettings *settings.Settings) *aerospike.QueryPolicy { + queryPolicy := aerospike.NewQueryPolicy() + + if tSettings.Aerospike.UseDefaultPolicies { + return queryPolicy + } + + queryPolicy.MaxRetries = queryMaxRetries + queryPolicy.TotalTimeout = queryTotalTimeout + queryPolicy.SocketTimeout = querySocketTimeout + queryPolicy.SleepBetweenRetries = querySleepBetweenRetries + queryPolicy.SleepMultiplier = querySleepMultiplier + + return queryPolicy +}