diff --git a/Makefile b/Makefile index 2d300b9b..e7c44da4 100644 --- a/Makefile +++ b/Makefile @@ -114,7 +114,7 @@ SETUP_SCRIPT=tests/scripts/setup-supernodes.sh # Optional: specify lumera binary path to skip download LUMERAD_BINARY ?= # Optional: specify installation mode (latest-release, latest-tag, or vX.Y.Z) -INSTALL_MODE ?=latest-tag +INSTALL_MODE ?=v1.7.2 install-lumera: @echo "Installing Lumera..." diff --git a/gen/supernode/service.swagger.json b/gen/supernode/service.swagger.json index c3944e9d..523499b8 100644 --- a/gen/supernode/service.swagger.json +++ b/gen/supernode/service.swagger.json @@ -744,6 +744,25 @@ }, "title": "System resource information" }, + "StatusResponseServiceTasks": { + "type": "object", + "properties": { + "serviceName": { + "type": "string" + }, + "taskIds": { + "type": "array", + "items": { + "type": "string" + } + }, + "taskCount": { + "type": "integer", + "format": "int32" + } + }, + "title": "ServiceTasks contains task information for a specific service" + }, "protobufAny": { "type": "object", "properties": { @@ -827,6 +846,14 @@ "resources": { "$ref": "#/definitions/StatusResponseResources" }, + "runningTasks": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/StatusResponseServiceTasks" + }, + "title": "Services with currently running tasks" + }, "registeredServices": { "type": "array", "items": { diff --git a/gen/supernode/status.pb.go b/gen/supernode/status.pb.go index a659d729..74e0d6d7 100644 --- a/gen/supernode/status.pb.go +++ b/gen/supernode/status.pb.go @@ -7,11 +7,12 @@ package supernode import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( @@ -70,15 +71,16 @@ func (x *StatusRequest) GetIncludeP2PMetrics() bool { // The StatusResponse represents system status with clear organization type StatusResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Supernode version - UptimeSeconds uint64 `protobuf:"varint,2,opt,name=uptime_seconds,json=uptimeSeconds,proto3" json:"uptime_seconds,omitempty"` // Uptime in seconds - Resources *StatusResponse_Resources `protobuf:"bytes,3,opt,name=resources,proto3" json:"resources,omitempty"` - RegisteredServices []string `protobuf:"bytes,5,rep,name=registered_services,json=registeredServices,proto3" json:"registered_services,omitempty"` // All registered/available services - Network *StatusResponse_Network `protobuf:"bytes,6,opt,name=network,proto3" json:"network,omitempty"` // P2P network information - Rank int32 `protobuf:"varint,7,opt,name=rank,proto3" json:"rank,omitempty"` // Rank in the top supernodes list (0 if not in top list) - IpAddress string `protobuf:"bytes,8,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"` // Supernode IP address with port (e.g., "192.168.1.1:4445") - P2PMetrics *StatusResponse_P2PMetrics `protobuf:"bytes,9,opt,name=p2p_metrics,json=p2pMetrics,proto3" json:"p2p_metrics,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` // Supernode version + UptimeSeconds uint64 `protobuf:"varint,2,opt,name=uptime_seconds,json=uptimeSeconds,proto3" json:"uptime_seconds,omitempty"` // Uptime in seconds + Resources *StatusResponse_Resources `protobuf:"bytes,3,opt,name=resources,proto3" json:"resources,omitempty"` + RunningTasks []*StatusResponse_ServiceTasks `protobuf:"bytes,4,rep,name=running_tasks,json=runningTasks,proto3" json:"running_tasks,omitempty"` // Services with currently running tasks + RegisteredServices []string `protobuf:"bytes,5,rep,name=registered_services,json=registeredServices,proto3" json:"registered_services,omitempty"` // All registered/available services + Network *StatusResponse_Network `protobuf:"bytes,6,opt,name=network,proto3" json:"network,omitempty"` // P2P network information + Rank int32 `protobuf:"varint,7,opt,name=rank,proto3" json:"rank,omitempty"` // Rank in the top supernodes list (0 if not in top list) + IpAddress string `protobuf:"bytes,8,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"` // Supernode IP address with port (e.g., "192.168.1.1:4445") + P2PMetrics *StatusResponse_P2PMetrics `protobuf:"bytes,9,opt,name=p2p_metrics,json=p2pMetrics,proto3" json:"p2p_metrics,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -134,6 +136,13 @@ func (x *StatusResponse) GetResources() *StatusResponse_Resources { return nil } +func (x *StatusResponse) GetRunningTasks() []*StatusResponse_ServiceTasks { + if x != nil { + return x.RunningTasks + } + return nil +} + func (x *StatusResponse) GetRegisteredServices() []string { if x != nil { return x.RegisteredServices @@ -238,6 +247,67 @@ func (x *StatusResponse_Resources) GetHardwareSummary() string { return "" } +// ServiceTasks contains task information for a specific service +type StatusResponse_ServiceTasks struct { + state protoimpl.MessageState `protogen:"open.v1"` + ServiceName string `protobuf:"bytes,1,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` + TaskIds []string `protobuf:"bytes,2,rep,name=task_ids,json=taskIds,proto3" json:"task_ids,omitempty"` + TaskCount int32 `protobuf:"varint,3,opt,name=task_count,json=taskCount,proto3" json:"task_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StatusResponse_ServiceTasks) Reset() { + *x = StatusResponse_ServiceTasks{} + mi := &file_supernode_status_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse_ServiceTasks) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse_ServiceTasks) ProtoMessage() {} + +func (x *StatusResponse_ServiceTasks) ProtoReflect() protoreflect.Message { + mi := &file_supernode_status_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse_ServiceTasks.ProtoReflect.Descriptor instead. +func (*StatusResponse_ServiceTasks) Descriptor() ([]byte, []int) { + return file_supernode_status_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *StatusResponse_ServiceTasks) GetServiceName() string { + if x != nil { + return x.ServiceName + } + return "" +} + +func (x *StatusResponse_ServiceTasks) GetTaskIds() []string { + if x != nil { + return x.TaskIds + } + return nil +} + +func (x *StatusResponse_ServiceTasks) GetTaskCount() int32 { + if x != nil { + return x.TaskCount + } + return 0 +} + // Network information type StatusResponse_Network struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -249,7 +319,7 @@ type StatusResponse_Network struct { func (x *StatusResponse_Network) Reset() { *x = StatusResponse_Network{} - mi := &file_supernode_status_proto_msgTypes[3] + mi := &file_supernode_status_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -261,7 +331,7 @@ func (x *StatusResponse_Network) String() string { func (*StatusResponse_Network) ProtoMessage() {} func (x *StatusResponse_Network) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[3] + mi := &file_supernode_status_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -274,7 +344,7 @@ func (x *StatusResponse_Network) ProtoReflect() protoreflect.Message { // Deprecated: Use StatusResponse_Network.ProtoReflect.Descriptor instead. func (*StatusResponse_Network) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 1} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 2} } func (x *StatusResponse_Network) GetPeersCount() int32 { @@ -306,7 +376,7 @@ type StatusResponse_P2PMetrics struct { func (x *StatusResponse_P2PMetrics) Reset() { *x = StatusResponse_P2PMetrics{} - mi := &file_supernode_status_proto_msgTypes[4] + mi := &file_supernode_status_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -318,7 +388,7 @@ func (x *StatusResponse_P2PMetrics) String() string { func (*StatusResponse_P2PMetrics) ProtoMessage() {} func (x *StatusResponse_P2PMetrics) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[4] + mi := &file_supernode_status_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -331,7 +401,7 @@ func (x *StatusResponse_P2PMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use StatusResponse_P2PMetrics.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3} } func (x *StatusResponse_P2PMetrics) GetDhtMetrics() *StatusResponse_P2PMetrics_DhtMetrics { @@ -386,7 +456,7 @@ type StatusResponse_Resources_CPU struct { func (x *StatusResponse_Resources_CPU) Reset() { *x = StatusResponse_Resources_CPU{} - mi := &file_supernode_status_proto_msgTypes[5] + mi := &file_supernode_status_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -398,7 +468,7 @@ func (x *StatusResponse_Resources_CPU) String() string { func (*StatusResponse_Resources_CPU) ProtoMessage() {} func (x *StatusResponse_Resources_CPU) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[5] + mi := &file_supernode_status_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -440,7 +510,7 @@ type StatusResponse_Resources_Memory struct { func (x *StatusResponse_Resources_Memory) Reset() { *x = StatusResponse_Resources_Memory{} - mi := &file_supernode_status_proto_msgTypes[6] + mi := &file_supernode_status_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -452,7 +522,7 @@ func (x *StatusResponse_Resources_Memory) String() string { func (*StatusResponse_Resources_Memory) ProtoMessage() {} func (x *StatusResponse_Resources_Memory) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[6] + mi := &file_supernode_status_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -509,7 +579,7 @@ type StatusResponse_Resources_Storage struct { func (x *StatusResponse_Resources_Storage) Reset() { *x = StatusResponse_Resources_Storage{} - mi := &file_supernode_status_proto_msgTypes[7] + mi := &file_supernode_status_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -521,7 +591,7 @@ func (x *StatusResponse_Resources_Storage) String() string { func (*StatusResponse_Resources_Storage) ProtoMessage() {} func (x *StatusResponse_Resources_Storage) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[7] + mi := &file_supernode_status_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -585,7 +655,7 @@ type StatusResponse_P2PMetrics_DhtMetrics struct { func (x *StatusResponse_P2PMetrics_DhtMetrics) Reset() { *x = StatusResponse_P2PMetrics_DhtMetrics{} - mi := &file_supernode_status_proto_msgTypes[8] + mi := &file_supernode_status_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -597,7 +667,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics) String() string { func (*StatusResponse_P2PMetrics_DhtMetrics) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_DhtMetrics) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[8] + mi := &file_supernode_status_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -610,7 +680,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics) ProtoReflect() protoreflect.Messa // Deprecated: Use StatusResponse_P2PMetrics_DhtMetrics.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_DhtMetrics) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 0} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 0} } func (x *StatusResponse_P2PMetrics_DhtMetrics) GetStoreSuccessRecent() []*StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint { @@ -654,7 +724,7 @@ type StatusResponse_P2PMetrics_HandleCounters struct { func (x *StatusResponse_P2PMetrics_HandleCounters) Reset() { *x = StatusResponse_P2PMetrics_HandleCounters{} - mi := &file_supernode_status_proto_msgTypes[9] + mi := &file_supernode_status_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -666,7 +736,7 @@ func (x *StatusResponse_P2PMetrics_HandleCounters) String() string { func (*StatusResponse_P2PMetrics_HandleCounters) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_HandleCounters) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[9] + mi := &file_supernode_status_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -679,7 +749,7 @@ func (x *StatusResponse_P2PMetrics_HandleCounters) ProtoReflect() protoreflect.M // Deprecated: Use StatusResponse_P2PMetrics_HandleCounters.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_HandleCounters) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 1} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 1} } func (x *StatusResponse_P2PMetrics_HandleCounters) GetTotal() int64 { @@ -725,7 +795,7 @@ type StatusResponse_P2PMetrics_BanEntry struct { func (x *StatusResponse_P2PMetrics_BanEntry) Reset() { *x = StatusResponse_P2PMetrics_BanEntry{} - mi := &file_supernode_status_proto_msgTypes[10] + mi := &file_supernode_status_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -737,7 +807,7 @@ func (x *StatusResponse_P2PMetrics_BanEntry) String() string { func (*StatusResponse_P2PMetrics_BanEntry) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_BanEntry) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[10] + mi := &file_supernode_status_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -750,7 +820,7 @@ func (x *StatusResponse_P2PMetrics_BanEntry) ProtoReflect() protoreflect.Message // Deprecated: Use StatusResponse_P2PMetrics_BanEntry.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_BanEntry) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 2} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 2} } func (x *StatusResponse_P2PMetrics_BanEntry) GetId() string { @@ -806,7 +876,7 @@ type StatusResponse_P2PMetrics_DatabaseStats struct { func (x *StatusResponse_P2PMetrics_DatabaseStats) Reset() { *x = StatusResponse_P2PMetrics_DatabaseStats{} - mi := &file_supernode_status_proto_msgTypes[11] + mi := &file_supernode_status_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -818,7 +888,7 @@ func (x *StatusResponse_P2PMetrics_DatabaseStats) String() string { func (*StatusResponse_P2PMetrics_DatabaseStats) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_DatabaseStats) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[11] + mi := &file_supernode_status_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -831,7 +901,7 @@ func (x *StatusResponse_P2PMetrics_DatabaseStats) ProtoReflect() protoreflect.Me // Deprecated: Use StatusResponse_P2PMetrics_DatabaseStats.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_DatabaseStats) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 3} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 3} } func (x *StatusResponse_P2PMetrics_DatabaseStats) GetP2PDbSizeMb() float64 { @@ -860,7 +930,7 @@ type StatusResponse_P2PMetrics_DiskStatus struct { func (x *StatusResponse_P2PMetrics_DiskStatus) Reset() { *x = StatusResponse_P2PMetrics_DiskStatus{} - mi := &file_supernode_status_proto_msgTypes[12] + mi := &file_supernode_status_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -872,7 +942,7 @@ func (x *StatusResponse_P2PMetrics_DiskStatus) String() string { func (*StatusResponse_P2PMetrics_DiskStatus) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_DiskStatus) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[12] + mi := &file_supernode_status_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -885,7 +955,7 @@ func (x *StatusResponse_P2PMetrics_DiskStatus) ProtoReflect() protoreflect.Messa // Deprecated: Use StatusResponse_P2PMetrics_DiskStatus.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_DiskStatus) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 4} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 4} } func (x *StatusResponse_P2PMetrics_DiskStatus) GetAllMb() float64 { @@ -921,7 +991,7 @@ type StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint struct { func (x *StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) Reset() { *x = StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint{} - mi := &file_supernode_status_proto_msgTypes[15] + mi := &file_supernode_status_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -933,7 +1003,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) String() string func (*StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[15] + mi := &file_supernode_status_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -946,7 +1016,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) ProtoReflect() // Deprecated: Use StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 0, 0} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 0, 0} } func (x *StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint) GetTimeUnix() int64 { @@ -991,7 +1061,7 @@ type StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint struct { func (x *StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) Reset() { *x = StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint{} - mi := &file_supernode_status_proto_msgTypes[16] + mi := &file_supernode_status_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1003,7 +1073,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) String() strin func (*StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) ProtoMessage() {} func (x *StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) ProtoReflect() protoreflect.Message { - mi := &file_supernode_status_proto_msgTypes[16] + mi := &file_supernode_status_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1016,7 +1086,7 @@ func (x *StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) ProtoReflect() // Deprecated: Use StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint.ProtoReflect.Descriptor instead. func (*StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) Descriptor() ([]byte, []int) { - return file_supernode_status_proto_rawDescGZIP(), []int{1, 2, 0, 1} + return file_supernode_status_proto_rawDescGZIP(), []int{1, 3, 0, 1} } func (x *StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint) GetTimeUnix() int64 { @@ -1067,11 +1137,12 @@ const file_supernode_status_proto_rawDesc = "" + "\n" + "\x16supernode/status.proto\x12\tsupernode\"?\n" + "\rStatusRequest\x12.\n" + - "\x13include_p2p_metrics\x18\x01 \x01(\bR\x11includeP2pMetrics\"\xca\x17\n" + + "\x13include_p2p_metrics\x18\x01 \x01(\bR\x11includeP2pMetrics\"\x84\x19\n" + "\x0eStatusResponse\x12\x18\n" + "\aversion\x18\x01 \x01(\tR\aversion\x12%\n" + "\x0euptime_seconds\x18\x02 \x01(\x04R\ruptimeSeconds\x12A\n" + - "\tresources\x18\x03 \x01(\v2#.supernode.StatusResponse.ResourcesR\tresources\x12/\n" + + "\tresources\x18\x03 \x01(\v2#.supernode.StatusResponse.ResourcesR\tresources\x12K\n" + + "\rrunning_tasks\x18\x04 \x03(\v2&.supernode.StatusResponse.ServiceTasksR\frunningTasks\x12/\n" + "\x13registered_services\x18\x05 \x03(\tR\x12registeredServices\x12;\n" + "\anetwork\x18\x06 \x01(\v2!.supernode.StatusResponse.NetworkR\anetwork\x12\x12\n" + "\x04rank\x18\a \x01(\x05R\x04rank\x12\x1d\n" + @@ -1099,7 +1170,12 @@ const file_supernode_status_proto_rawDesc = "" + "\n" + "used_bytes\x18\x03 \x01(\x04R\tusedBytes\x12'\n" + "\x0favailable_bytes\x18\x04 \x01(\x04R\x0eavailableBytes\x12#\n" + - "\rusage_percent\x18\x05 \x01(\x01R\fusagePercent\x1aQ\n" + + "\rusage_percent\x18\x05 \x01(\x01R\fusagePercent\x1ak\n" + + "\fServiceTasks\x12!\n" + + "\fservice_name\x18\x01 \x01(\tR\vserviceName\x12\x19\n" + + "\btask_ids\x18\x02 \x03(\tR\ataskIds\x12\x1d\n" + + "\n" + + "task_count\x18\x03 \x01(\x05R\ttaskCount\x1aQ\n" + "\aNetwork\x12\x1f\n" + "\vpeers_count\x18\x01 \x01(\x05R\n" + "peersCount\x12%\n" + @@ -1175,47 +1251,49 @@ func file_supernode_status_proto_rawDescGZIP() []byte { return file_supernode_status_proto_rawDescData } -var file_supernode_status_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_supernode_status_proto_msgTypes = make([]protoimpl.MessageInfo, 18) var file_supernode_status_proto_goTypes = []any{ (*StatusRequest)(nil), // 0: supernode.StatusRequest (*StatusResponse)(nil), // 1: supernode.StatusResponse (*StatusResponse_Resources)(nil), // 2: supernode.StatusResponse.Resources - (*StatusResponse_Network)(nil), // 3: supernode.StatusResponse.Network - (*StatusResponse_P2PMetrics)(nil), // 4: supernode.StatusResponse.P2PMetrics - (*StatusResponse_Resources_CPU)(nil), // 5: supernode.StatusResponse.Resources.CPU - (*StatusResponse_Resources_Memory)(nil), // 6: supernode.StatusResponse.Resources.Memory - (*StatusResponse_Resources_Storage)(nil), // 7: supernode.StatusResponse.Resources.Storage - (*StatusResponse_P2PMetrics_DhtMetrics)(nil), // 8: supernode.StatusResponse.P2PMetrics.DhtMetrics - (*StatusResponse_P2PMetrics_HandleCounters)(nil), // 9: supernode.StatusResponse.P2PMetrics.HandleCounters - (*StatusResponse_P2PMetrics_BanEntry)(nil), // 10: supernode.StatusResponse.P2PMetrics.BanEntry - (*StatusResponse_P2PMetrics_DatabaseStats)(nil), // 11: supernode.StatusResponse.P2PMetrics.DatabaseStats - (*StatusResponse_P2PMetrics_DiskStatus)(nil), // 12: supernode.StatusResponse.P2PMetrics.DiskStatus - nil, // 13: supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry - nil, // 14: supernode.StatusResponse.P2PMetrics.ConnPoolMetricsEntry - (*StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint)(nil), // 15: supernode.StatusResponse.P2PMetrics.DhtMetrics.StoreSuccessPoint - (*StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint)(nil), // 16: supernode.StatusResponse.P2PMetrics.DhtMetrics.BatchRetrievePoint + (*StatusResponse_ServiceTasks)(nil), // 3: supernode.StatusResponse.ServiceTasks + (*StatusResponse_Network)(nil), // 4: supernode.StatusResponse.Network + (*StatusResponse_P2PMetrics)(nil), // 5: supernode.StatusResponse.P2PMetrics + (*StatusResponse_Resources_CPU)(nil), // 6: supernode.StatusResponse.Resources.CPU + (*StatusResponse_Resources_Memory)(nil), // 7: supernode.StatusResponse.Resources.Memory + (*StatusResponse_Resources_Storage)(nil), // 8: supernode.StatusResponse.Resources.Storage + (*StatusResponse_P2PMetrics_DhtMetrics)(nil), // 9: supernode.StatusResponse.P2PMetrics.DhtMetrics + (*StatusResponse_P2PMetrics_HandleCounters)(nil), // 10: supernode.StatusResponse.P2PMetrics.HandleCounters + (*StatusResponse_P2PMetrics_BanEntry)(nil), // 11: supernode.StatusResponse.P2PMetrics.BanEntry + (*StatusResponse_P2PMetrics_DatabaseStats)(nil), // 12: supernode.StatusResponse.P2PMetrics.DatabaseStats + (*StatusResponse_P2PMetrics_DiskStatus)(nil), // 13: supernode.StatusResponse.P2PMetrics.DiskStatus + nil, // 14: supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry + nil, // 15: supernode.StatusResponse.P2PMetrics.ConnPoolMetricsEntry + (*StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint)(nil), // 16: supernode.StatusResponse.P2PMetrics.DhtMetrics.StoreSuccessPoint + (*StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint)(nil), // 17: supernode.StatusResponse.P2PMetrics.DhtMetrics.BatchRetrievePoint } var file_supernode_status_proto_depIdxs = []int32{ 2, // 0: supernode.StatusResponse.resources:type_name -> supernode.StatusResponse.Resources - 3, // 1: supernode.StatusResponse.network:type_name -> supernode.StatusResponse.Network - 4, // 2: supernode.StatusResponse.p2p_metrics:type_name -> supernode.StatusResponse.P2PMetrics - 5, // 3: supernode.StatusResponse.Resources.cpu:type_name -> supernode.StatusResponse.Resources.CPU - 6, // 4: supernode.StatusResponse.Resources.memory:type_name -> supernode.StatusResponse.Resources.Memory - 7, // 5: supernode.StatusResponse.Resources.storage_volumes:type_name -> supernode.StatusResponse.Resources.Storage - 8, // 6: supernode.StatusResponse.P2PMetrics.dht_metrics:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics - 13, // 7: supernode.StatusResponse.P2PMetrics.network_handle_metrics:type_name -> supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry - 14, // 8: supernode.StatusResponse.P2PMetrics.conn_pool_metrics:type_name -> supernode.StatusResponse.P2PMetrics.ConnPoolMetricsEntry - 10, // 9: supernode.StatusResponse.P2PMetrics.ban_list:type_name -> supernode.StatusResponse.P2PMetrics.BanEntry - 11, // 10: supernode.StatusResponse.P2PMetrics.database:type_name -> supernode.StatusResponse.P2PMetrics.DatabaseStats - 12, // 11: supernode.StatusResponse.P2PMetrics.disk:type_name -> supernode.StatusResponse.P2PMetrics.DiskStatus - 15, // 12: supernode.StatusResponse.P2PMetrics.DhtMetrics.store_success_recent:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics.StoreSuccessPoint - 16, // 13: supernode.StatusResponse.P2PMetrics.DhtMetrics.batch_retrieve_recent:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics.BatchRetrievePoint - 9, // 14: supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry.value:type_name -> supernode.StatusResponse.P2PMetrics.HandleCounters - 15, // [15:15] is the sub-list for method output_type - 15, // [15:15] is the sub-list for method input_type - 15, // [15:15] is the sub-list for extension type_name - 15, // [15:15] is the sub-list for extension extendee - 0, // [0:15] is the sub-list for field type_name + 3, // 1: supernode.StatusResponse.running_tasks:type_name -> supernode.StatusResponse.ServiceTasks + 4, // 2: supernode.StatusResponse.network:type_name -> supernode.StatusResponse.Network + 5, // 3: supernode.StatusResponse.p2p_metrics:type_name -> supernode.StatusResponse.P2PMetrics + 6, // 4: supernode.StatusResponse.Resources.cpu:type_name -> supernode.StatusResponse.Resources.CPU + 7, // 5: supernode.StatusResponse.Resources.memory:type_name -> supernode.StatusResponse.Resources.Memory + 8, // 6: supernode.StatusResponse.Resources.storage_volumes:type_name -> supernode.StatusResponse.Resources.Storage + 9, // 7: supernode.StatusResponse.P2PMetrics.dht_metrics:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics + 14, // 8: supernode.StatusResponse.P2PMetrics.network_handle_metrics:type_name -> supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry + 15, // 9: supernode.StatusResponse.P2PMetrics.conn_pool_metrics:type_name -> supernode.StatusResponse.P2PMetrics.ConnPoolMetricsEntry + 11, // 10: supernode.StatusResponse.P2PMetrics.ban_list:type_name -> supernode.StatusResponse.P2PMetrics.BanEntry + 12, // 11: supernode.StatusResponse.P2PMetrics.database:type_name -> supernode.StatusResponse.P2PMetrics.DatabaseStats + 13, // 12: supernode.StatusResponse.P2PMetrics.disk:type_name -> supernode.StatusResponse.P2PMetrics.DiskStatus + 16, // 13: supernode.StatusResponse.P2PMetrics.DhtMetrics.store_success_recent:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics.StoreSuccessPoint + 17, // 14: supernode.StatusResponse.P2PMetrics.DhtMetrics.batch_retrieve_recent:type_name -> supernode.StatusResponse.P2PMetrics.DhtMetrics.BatchRetrievePoint + 10, // 15: supernode.StatusResponse.P2PMetrics.NetworkHandleMetricsEntry.value:type_name -> supernode.StatusResponse.P2PMetrics.HandleCounters + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_supernode_status_proto_init() } @@ -1229,7 +1307,7 @@ func file_supernode_status_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_supernode_status_proto_rawDesc), len(file_supernode_status_proto_rawDesc)), NumEnums: 0, - NumMessages: 17, + NumMessages: 18, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/common/blocktracker/block_tracker.go b/pkg/common/blocktracker/block_tracker.go deleted file mode 100644 index 00f8c512..00000000 --- a/pkg/common/blocktracker/block_tracker.go +++ /dev/null @@ -1,121 +0,0 @@ -package blocktracker - -import ( - "context" - "sync" - "time" - - "github.com/LumeraProtocol/supernode/v2/pkg/errors" -) - -const ( - defaultRetries = 3 - defaultDelayDurationBetweenRetries = 5 * time.Second - defaultRPCConnectTimeout = 15 * time.Second - // Update duration in case last update was success - defaultSuccessUpdateDuration = 10 * time.Second - // Update duration in case last update was failed - prevent too much call to Lumera - defaultFailedUpdateDuration = 5 * time.Second - defaultNextBlockTimeout = 30 * time.Minute -) - -// LumeraClient defines interface functions BlockCntTracker expects from Lumera -type LumeraClient interface { - // GetBlockCount returns block height of blockchain - GetBlockCount(ctx context.Context) (int32, error) -} - -// BlockCntTracker defines a block tracker - that will keep current block height -type BlockCntTracker struct { - mtx sync.Mutex - LumeraClient LumeraClient - curBlockCnt int32 - lastSuccess time.Time - lastRetried time.Time - lastErr error - delayBetweenRetries time.Duration - retries int -} - -// New returns an instance of BlockCntTracker -func New(LumeraClient LumeraClient) *BlockCntTracker { - return &BlockCntTracker{ - LumeraClient: LumeraClient, - curBlockCnt: 0, - delayBetweenRetries: defaultDelayDurationBetweenRetries, - retries: defaultRetries, - } -} - -func (tracker *BlockCntTracker) refreshBlockCount(retries int) { - tracker.lastRetried = time.Now().UTC() - for i := 0; i < retries; i = i + 1 { - ctx, cancel := context.WithTimeout(context.Background(), defaultRPCConnectTimeout) - blockCnt, err := tracker.LumeraClient.GetBlockCount(ctx) - if err == nil { - tracker.curBlockCnt = blockCnt - tracker.lastSuccess = time.Now().UTC() - cancel() - tracker.lastErr = nil - return - } - cancel() - - tracker.lastErr = err - // delay between retries - time.Sleep(tracker.delayBetweenRetries) - } - -} - -// GetBlockCount return current block count -// it will get from cache if last refresh is small than defaultSuccessUpdateDuration -// or will refresh it by call from Lumera daemon to get the latest one if defaultSuccessUpdateDuration expired -func (tracker *BlockCntTracker) GetBlockCount() (int32, error) { - tracker.mtx.Lock() - defer tracker.mtx.Unlock() - - shouldRefresh := false - - if tracker.lastSuccess.After(tracker.lastRetried) { - if time.Now().UTC().After(tracker.lastSuccess.Add(defaultSuccessUpdateDuration)) { - shouldRefresh = true - } - } else { - // prevent update too much - if time.Now().UTC().After(tracker.lastRetried.Add(defaultFailedUpdateDuration)) { - shouldRefresh = true - } - } - - if shouldRefresh { - tracker.refreshBlockCount(tracker.retries) - } - - if tracker.curBlockCnt == 0 { - return 0, errors.Errorf("failed to get blockcount: %w", tracker.lastErr) - } - - return tracker.curBlockCnt, nil -} - -// WaitTillNextBlock will wait until next block height is greater than blockCnt -func (tracker *BlockCntTracker) WaitTillNextBlock(ctx context.Context, blockCnt int32) error { - for { - select { - case <-ctx.Done(): - return errors.Errorf("context done: %w", ctx.Err()) - case <-time.After(defaultNextBlockTimeout): - return errors.Errorf("timeout waiting for next block") - case <-time.After(defaultSuccessUpdateDuration): - curBlockCnt, err := tracker.GetBlockCount() - if err != nil { - return errors.Errorf("failed to get blockcount: %w", err) - } - - if curBlockCnt > blockCnt { - return nil - } - } - } -} diff --git a/pkg/common/blocktracker/block_tracker_test.go b/pkg/common/blocktracker/block_tracker_test.go deleted file mode 100644 index b070a4b7..00000000 --- a/pkg/common/blocktracker/block_tracker_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package blocktracker - -import ( - "context" - "errors" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -type fakePastelClient struct { - retBlockCnt int32 - retErr error -} - -func (fake *fakePastelClient) GetBlockCount(_ context.Context) (int32, error) { - return fake.retBlockCnt, fake.retErr -} - -func TestGetCountFirstTime(t *testing.T) { - tests := []struct { - name string - pastelClient *fakePastelClient - expectErr bool - }{ - { - name: "success", - pastelClient: &fakePastelClient{ - retBlockCnt: 10, - retErr: nil, - }, - expectErr: false, - }, - { - name: "fail", - pastelClient: &fakePastelClient{ - retBlockCnt: 0, - retErr: errors.New("error"), - }, - expectErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tracker := New(tt.pastelClient) - tracker.retries = 1 - blkCnt, err := tracker.GetBlockCount() - assert.Equal(t, tt.pastelClient.retBlockCnt, blkCnt) - if tt.expectErr { - assert.True(t, strings.Contains(err.Error(), tt.pastelClient.retErr.Error())) - } else { - assert.Nil(t, err) - } - }) - } -} - -func TestGetBlockCountNoRefresh(t *testing.T) { - pastelClient := &fakePastelClient{ - retBlockCnt: 10, - retErr: errors.New("error"), - } - - expectedBlk := int32(1) - tracker := New(pastelClient) - tracker.retries = 1 - tracker.curBlockCnt = expectedBlk - tracker.lastRetried = time.Now().UTC() - tracker.lastSuccess = time.Now().UTC() - - blkCnt, err := tracker.GetBlockCount() - assert.Equal(t, expectedBlk, blkCnt) - - assert.Nil(t, err) -} - -func TestGetBlockCountRefresh(t *testing.T) { - expectedBlk := int32(10) - pastelClient := &fakePastelClient{ - retBlockCnt: expectedBlk, - retErr: nil, - } - - tracker := New(pastelClient) - tracker.retries = 1 - tracker.curBlockCnt = 1 - tracker.lastRetried = time.Now().UTC().Add(-defaultSuccessUpdateDuration) - tracker.lastSuccess = time.Now().UTC().Add(-defaultSuccessUpdateDuration) - - blkCnt, err := tracker.GetBlockCount() - assert.Equal(t, expectedBlk, blkCnt) - - assert.Nil(t, err) -} diff --git a/pkg/common/task/action.go b/pkg/common/task/action.go deleted file mode 100644 index 227ebe35..00000000 --- a/pkg/common/task/action.go +++ /dev/null @@ -1,20 +0,0 @@ -package task - -import "context" - -// ActionFn represents a function that is run inside a goroutine. -type ActionFn func(ctx context.Context) error - -// Action represents the action of the task. -type Action struct { - fn ActionFn - doneCh chan struct{} -} - -// NewAction returns a new Action instance. -func NewAction(fn ActionFn) *Action { - return &Action{ - fn: fn, - doneCh: make(chan struct{}), - } -} diff --git a/pkg/common/task/state/state.go b/pkg/common/task/state/state.go deleted file mode 100644 index 05179a85..00000000 --- a/pkg/common/task/state/state.go +++ /dev/null @@ -1,174 +0,0 @@ -//go:generate mockery --name=State - -package state - -import ( - "context" - "sync" - "time" - - "github.com/LumeraProtocol/supernode/v2/pkg/errors" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" - "github.com/LumeraProtocol/supernode/v2/pkg/storage/queries" - "github.com/LumeraProtocol/supernode/v2/pkg/types" -) - -// State represents a state of the task. -type State interface { - // Status returns the current status. - Status() *Status - - // SetStatusNotifyFunc sets a function to be called after the state is updated. - SetStatusNotifyFunc(fn func(status *Status)) - - // RequiredStatus returns an error if the current status doen't match the given one. - RequiredStatus(subStatus SubStatus) error - - // StatusHistory returns all history from the very beginning. - StatusHistory() []*Status - - // UpdateStatus updates the status of the state by creating a new status with the given `status`. - UpdateStatus(subStatus SubStatus) - - // SubscribeStatus returns a new subscription of the state. - SubscribeStatus() func() <-chan *Status - - //SetStateLog set the wallet node task status log to the state status log - SetStateLog(statusLog types.Fields) - - //InitialiseHistoryDB sets the connection to historyDB - InitialiseHistoryDB(store queries.LocalStoreInterface) -} - -type state struct { - status *Status - history []*Status - - notifyFn func(status *Status) - sync.RWMutex - subsCh []chan *Status - taskID string - statusLog types.Fields - historyDBStore queries.LocalStoreInterface -} - -// Status implements State.Status() -func (state *state) Status() *Status { - return state.status -} - -// SetStatusNotifyFunc implements State.SetStatusNotifyFunc() -func (state *state) SetStatusNotifyFunc(fn func(status *Status)) { - state.notifyFn = fn -} - -// RequiredStatus implements State.RequiredStatus() -func (state *state) RequiredStatus(subStatus SubStatus) error { - if state.status.Is(subStatus) { - return nil - } - return errors.Errorf("required status %q, current %q", subStatus, state.status) -} - -// StatusHistory implements State.StatusHistory() -func (state *state) StatusHistory() []*Status { - state.RLock() - defer state.RUnlock() - - return append(state.history, state.status) -} - -// UpdateStatus implements State.UpdateStatus() -func (state *state) UpdateStatus(subStatus SubStatus) { - state.Lock() - defer state.Unlock() - - status := NewStatus(subStatus) - state.history = append(state.history, state.status) - state.status = status - - history := types.TaskHistory{CreatedAt: time.Now().UTC(), TaskID: state.taskID, Status: status.String()} - if state.statusLog.IsValid() { - history.Details = types.NewDetails(status.String(), state.statusLog) - } - - if state.historyDBStore != nil { - if _, err := state.historyDBStore.InsertTaskHistory(history); err != nil { - logtrace.Error(context.Background(), "unable to store task status", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - } else { - store, err := queries.OpenHistoryDB() - if err != nil { - logtrace.Error(context.Background(), "error opening history db", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - - if store != nil { - defer store.CloseHistoryDB(context.Background()) - if _, err := store.InsertTaskHistory(history); err != nil { - logtrace.Error(context.Background(), "unable to store task status", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - } - } - - if state.notifyFn != nil { - state.notifyFn(status) - } - - for _, subCh := range state.subsCh { - subCh := subCh - go func() { - subCh <- status - }() - } -} - -// SubscribeStatus implements State.SubscribeStatus() -func (state *state) SubscribeStatus() func() <-chan *Status { - state.RLock() - defer state.RUnlock() - - subCh := make(chan *Status) - state.subsCh = append(state.subsCh, subCh) - - for _, status := range append(state.history, state.status) { - status := status - go func() { - subCh <- status - }() - } - - sub := func() <-chan *Status { - return subCh - } - return sub -} - -func (state *state) SetStateLog(statusLog types.Fields) { - state.statusLog = statusLog -} - -func (state *state) InitialiseHistoryDB(storeInterface queries.LocalStoreInterface) { - state.historyDBStore = storeInterface -} - -// New returns a new state instance. -func New(subStatus SubStatus, taskID string) State { - store, err := queries.OpenHistoryDB() - if err != nil { - logtrace.Error(context.Background(), "error opening history db", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - - if store != nil { - defer store.CloseHistoryDB(context.Background()) - - if _, err := store.InsertTaskHistory(types.TaskHistory{CreatedAt: time.Now().UTC(), TaskID: taskID, - Status: subStatus.String()}); err != nil { - logtrace.Error(context.Background(), "unable to store task status", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - } - - return &state{ - status: NewStatus(subStatus), - taskID: taskID, - } -} diff --git a/pkg/common/task/state/status.go b/pkg/common/task/state/status.go deleted file mode 100644 index b1b00da6..00000000 --- a/pkg/common/task/state/status.go +++ /dev/null @@ -1,34 +0,0 @@ -//go:generate mockery --name=SubStatus - -package state - -import ( - "fmt" - "time" -) - -// SubStatus represents a sub-status that contains a description of the status. -type SubStatus interface { - fmt.Stringer - IsFinal() bool - IsFailure() bool -} - -// Status represents a state of the task. -type Status struct { - CreatedAt time.Time - SubStatus -} - -// Is returns true if the current `Status` matches to the given `statuses`. -func (status *Status) Is(subStatus SubStatus) bool { - return status.SubStatus == subStatus -} - -// NewStatus returns a new Status instance. -func NewStatus(subStatus SubStatus) *Status { - return &Status{ - CreatedAt: time.Now().UTC(), - SubStatus: subStatus, - } -} diff --git a/pkg/common/task/task.go b/pkg/common/task/task.go deleted file mode 100644 index adf173e4..00000000 --- a/pkg/common/task/task.go +++ /dev/null @@ -1,143 +0,0 @@ -//go:generate mockery --name=Task - -package task - -import ( - "context" - "sync" - - "github.com/LumeraProtocol/supernode/v2/pkg/common/task/state" - "github.com/LumeraProtocol/supernode/v2/pkg/errgroup" - "github.com/LumeraProtocol/supernode/v2/pkg/errors" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" - "github.com/LumeraProtocol/supernode/v2/pkg/random" -) - -// Task represent a worker task. -type Task interface { - state.State - - // ID returns id of the task. - ID() string - - // Run starts the task. - Run(ctx context.Context) error - - // Cancel tells a task to abandon its work. - // Cancel may be called by multiple goroutines simultaneously. - // After the first call, subsequent calls to a Cancel do nothing. - Cancel() - - // Done returns a channel when the task is canceled. - Done() <-chan struct{} - - // RunAction waits for new actions, starts handling each of them in a new goroutine. - RunAction(ctx context.Context) error - - // NewAction creates a new action and passes for the execution. - // It is used when it is necessary to run an action in the context of `Tasks` rather than the one who was called. - NewAction(fn ActionFn) <-chan struct{} - - // CloseActionCh closes action ch - CloseActionCh() -} - -type task struct { - state.State - - id string - - actionCh chan *Action - - doneMu sync.Mutex - doneCh chan struct{} - closeOnce sync.Once -} - -// ID implements Task.ID -func (task *task) ID() string { - return task.id -} - -// Run implements Task.Run -func (task *task) Run(_ context.Context) error { - return errors.New("task default run func not implemented") -} - -// Cancel implements Task.Cancel -func (task *task) Cancel() { - task.doneMu.Lock() - defer task.doneMu.Unlock() - - select { - case <-task.Done(): - logtrace.Debug(context.Background(), "task cancelled", logtrace.Fields{"task_id": task.ID()}) - return - default: - close(task.doneCh) - } -} - -// Done implements Task.Done -func (task *task) Done() <-chan struct{} { - return task.doneCh -} - -// RunAction implements Task.RunAction -func (task *task) RunAction(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - group, ctx := errgroup.WithContext(ctx) - for { - select { - case <-ctx.Done(): - logtrace.Debug(ctx, "context done", logtrace.Fields{"task_id": task.ID()}) - case <-task.Done(): - logtrace.Debug(ctx, "task done", logtrace.Fields{"task_id": task.ID()}) - cancel() - case action, ok := <-task.actionCh: - if !ok { - logtrace.Debug(ctx, "action channel closed", logtrace.Fields{"task_id": task.ID()}) - return group.Wait() - } - - currAction := action - group.Go(func() error { - defer close(currAction.doneCh) - - return currAction.fn(ctx) - }) - continue - } - break - } - - return group.Wait() -} - -// CloseActionCh safely closes the action channel -func (task *task) CloseActionCh() { - task.closeOnce.Do(func() { - close(task.actionCh) - }) -} - -// NewAction implements Task.NewAction -func (task *task) NewAction(fn ActionFn) <-chan struct{} { - act := NewAction(fn) - task.actionCh <- act - return act.doneCh -} - -// New returns a new task instance. -func New(status state.SubStatus) Task { - taskID, _ := random.String(8, random.Base62Chars) - - return &task{ - State: state.New(status, taskID), - id: taskID, - doneCh: make(chan struct{}), - actionCh: make(chan *Action), - } -} diff --git a/pkg/common/task/ticket.go b/pkg/common/task/ticket.go deleted file mode 100644 index 561b8f0b..00000000 --- a/pkg/common/task/ticket.go +++ /dev/null @@ -1,13 +0,0 @@ -package task - -type CascadeTicket struct { - Creator string `json:"creator"` - CreatorSignature []byte `json:"creator_signature"` - DataHash string `json:"data_hash"` - ActionID string `json:"action_id"` - BlockHeight int64 `json:"block_height"` - BlockHash []byte `json:"block_hash"` - RQIDsIC uint32 `json:"rqids_ic"` - RQIDsMax int32 `json:"rqids_max"` - RQIDs []string `json:"rq_ids"` -} diff --git a/pkg/common/task/worker.go b/pkg/common/task/worker.go deleted file mode 100644 index 14043079..00000000 --- a/pkg/common/task/worker.go +++ /dev/null @@ -1,144 +0,0 @@ -package task - -import ( - "context" - "sync" - "time" - - "github.com/LumeraProtocol/supernode/v2/pkg/errgroup" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" -) - -// Worker represents a pool of the task. -type Worker struct { - sync.Mutex - - tasks []Task - taskCh chan Task -} - -// Tasks returns all tasks. -func (worker *Worker) Tasks() []Task { - worker.Lock() - defer worker.Unlock() - - // return a shallow copy to avoid data races - copied := make([]Task, len(worker.tasks)) - copy(copied, worker.tasks) - return copied -} - -// Task returns the task by the given id. -func (worker *Worker) Task(taskID string) Task { - worker.Lock() - defer worker.Unlock() - - for _, task := range worker.tasks { - if task.ID() == taskID { - return task - } - } - return nil -} - -// AddTask adds the new task. -func (worker *Worker) AddTask(task Task) { - worker.Lock() - defer worker.Unlock() - - worker.tasks = append(worker.tasks, task) - worker.taskCh <- task - - // Proactively remove the task once it's done to prevent lingering entries - go func(t Task) { - <-t.Done() - // remove promptly when the task signals completion/cancelation - worker.RemoveTask(t) - }(task) -} - -// RemoveTask removes the task. -func (worker *Worker) RemoveTask(subTask Task) { - worker.Lock() - defer worker.Unlock() - - for i, task := range worker.tasks { - if task == subTask { - worker.tasks = append(worker.tasks[:i], worker.tasks[i+1:]...) - return - } - } -} - -// Run waits for new tasks, starts handling each of them in a new goroutine. -func (worker *Worker) Run(ctx context.Context) error { - group, _ := errgroup.WithContext(ctx) // Create an error group but ignore the derived context - // Background sweeper to prune finalized tasks that might linger - // even if the task's Run wasn't executed to completion. - sweeperCtx, sweeperCancel := context.WithCancel(ctx) - defer sweeperCancel() - go worker.cleanupLoop(sweeperCtx) - for { - select { - case <-ctx.Done(): - logtrace.Warn(ctx, "Worker run stopping", logtrace.Fields{logtrace.FieldError: ctx.Err().Error()}) - return group.Wait() - case t := <-worker.taskCh: // Rename here - currentTask := t // Capture the loop variable - group.Go(func() error { - defer func() { - if r := recover(); r != nil { - logtrace.Error(ctx, "Recovered from panic in common task's worker run", logtrace.Fields{"task": currentTask.ID(), "error": r}) - } - - logtrace.Debug(ctx, "Task Removed", logtrace.Fields{"task": currentTask.ID()}) - // Remove the task from the worker's task list - worker.RemoveTask(currentTask) - }() - - return currentTask.Run(ctx) // Use the captured variable - }) - } - } -} - -// NewWorker returns a new Worker instance. -func NewWorker() *Worker { - w := &Worker{taskCh: make(chan Task)} - return w -} - -// cleanupLoop periodically removes tasks that are in a final state for a grace period -func (worker *Worker) cleanupLoop(ctx context.Context) { - const ( - cleanupInterval = 30 * time.Second - finalTaskTTL = 2 * time.Minute - ) - - ticker := time.NewTicker(cleanupInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - now := time.Now() - worker.Lock() - // iterate and compact in-place - kept := worker.tasks[:0] - for _, t := range worker.tasks { - st := t.Status() - if st != nil && st.SubStatus != nil && st.SubStatus.IsFinal() { - if now.Sub(st.CreatedAt) >= finalTaskTTL { - // drop this finalized task - continue - } - } - kept = append(kept, t) - } - worker.tasks = kept - worker.Unlock() - } - } -} diff --git a/pkg/common/task/worker_test.go b/pkg/common/task/worker_test.go deleted file mode 100644 index 4c5f21ac..00000000 --- a/pkg/common/task/worker_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package task - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestWorkerTasks(t *testing.T) { - t.Parallel() - - type fields struct { - tasks []Task - } - tests := []struct { - name string - fields fields - want []Task - }{ - { - name: "retrieve tasks", - fields: fields{ - tasks: []Task{&task{id: "1"}, &task{id: "2"}}, - }, - want: []Task{&task{id: "1"}, &task{id: "2"}}, - }, - } - - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - worker := &Worker{ - tasks: tt.fields.tasks, - } - assert.Equal(t, tt.want, worker.Tasks()) - }) - } -} - -func TestWorkerTask(t *testing.T) { - t.Parallel() - - type fields struct { - tasks []Task - } - type args struct { - taskID string - } - tests := []struct { - name string - fields fields - args args - want Task - }{ - { - name: "get task with id 1", - fields: fields{ - tasks: []Task{&task{id: "1"}, &task{id: "2"}}, - }, - args: args{"2"}, - want: &task{id: "2"}, - }, - { - name: "get not exist task", - fields: fields{ - tasks: []Task{&task{id: "1"}, &task{id: "2"}}, - }, - args: args{"3"}, - want: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - worker := &Worker{ - tasks: tt.fields.tasks, - } - assert.Equal(t, tt.want, worker.Task(tt.args.taskID)) - }) - } -} - -func TestWorkerAddTask(t *testing.T) { - t.Parallel() - - type args struct { - task Task - } - tests := []struct { - name string - args args - want []Task - }{ - { - name: "add task", - args: args{&task{id: "1"}}, - want: []Task{&task{id: "1"}}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - worker := &Worker{ - taskCh: make(chan Task), - } - - go func() { - worker.AddTask(tt.args.task) - }() - - <-worker.taskCh - tasks := worker.tasks - assert.Equal(t, tt.want, tasks) - - }) - } -} - -func TestWorkerRemoveTask(t *testing.T) { - t.Parallel() - - type args struct { - subTask Task - } - tests := []struct { - name string - args args - want []Task - }{ - { - name: "removed task", - args: args{&task{id: "1"}}, - want: []Task{}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - worker := &Worker{ - tasks: []Task{tt.args.subTask}, - } - - worker.RemoveTask(tt.args.subTask) - assert.Equal(t, tt.want, worker.tasks) - }) - } -} diff --git a/pkg/task/handle.go b/pkg/task/handle.go new file mode 100644 index 00000000..33359e38 --- /dev/null +++ b/pkg/task/handle.go @@ -0,0 +1,65 @@ +package task + +import ( + "context" + "sync" + "time" + + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" +) + +// Handle manages a running task with an optional watchdog. +// It ensures Start and End are paired, logs start/end, and auto-ends on timeout. +type Handle struct { + service string + id string + stop chan struct{} + once sync.Once +} + +// Start starts tracking a task and returns a Handle that will ensure the +// task is ended. A watchdog is started to auto-end the task after timeout +// to avoid indefinitely stuck running tasks in status reporting. +func Start(ctx context.Context, service, id string, timeout time.Duration) *Handle { + if service == "" || id == "" { + return &Handle{} + } + Default.Start(service, id) + logtrace.Info(ctx, "task: started", logtrace.Fields{"service": service, "task_id": id}) + + g := &Handle{service: service, id: id, stop: make(chan struct{})} + if timeout > 0 { + go func() { + select { + case <-time.After(timeout): + // Auto-end if not already ended + g.endWith(ctx, true) + case <-g.stop: + // normal completion + } + }() + } + return g +} + +// End stops tracking the task. Safe to call multiple times. +func (g *Handle) End(ctx context.Context) { + g.endWith(ctx, false) +} + +// EndWith ends the guard and logs accordingly. If expired is true, +// it emits a warning and ends the task to avoid stuck status. +func (g *Handle) endWith(ctx context.Context, expired bool) { + if g == nil || g.service == "" || g.id == "" { + return + } + g.once.Do(func() { + close(g.stop) + Default.End(g.service, g.id) + if expired { + logtrace.Warn(ctx, "task: watchdog expired", logtrace.Fields{"service": g.service, "task_id": g.id}) + } else { + logtrace.Info(ctx, "task: ended", logtrace.Fields{"service": g.service, "task_id": g.id}) + } + }) +} diff --git a/pkg/task/task.go b/pkg/task/task.go new file mode 100644 index 00000000..6bf50e78 --- /dev/null +++ b/pkg/task/task.go @@ -0,0 +1,82 @@ +// Package task provides a lean, concurrency-safe, in-memory tracker for +// live tasks running inside a service. It is designed to be generic and +// reusable across multiple features (e.g., cascade upload/download) and +// only tracks tasks while the enclosing RPC/handler is alive. No +// persistence, progress reporting, or background processing is included. +package task + +import "sync" + +// Tracker defines a minimal interface for tracking live tasks per service. +// Implementations must be concurrency-safe. All methods are non-blocking +// and best-effort; invalid inputs are ignored. +type Tracker interface { + Start(service, taskID string) + End(service, taskID string) + Snapshot() map[string][]string +} + +// InMemoryTracker is a lean, concurrency-safe tracker of live tasks. +// It stores only in-memory state for the lifetime of the process and +// returns copies when asked for a snapshot to ensure isolation. +type InMemoryTracker struct { + mu sync.RWMutex + // service -> set(taskID) + data map[string]map[string]struct{} +} + +// Default is a package-level tracker instance for convenience. +var Default = New() + +// New creates and returns a new in-memory tracker. +func New() *InMemoryTracker { + return &InMemoryTracker{data: make(map[string]map[string]struct{})} +} + +// Start marks a task as running under a given service. Empty arguments +// are ignored. Calling Start with the same (service, taskID) pair is idempotent. +func (t *InMemoryTracker) Start(service, taskID string) { + if service == "" || taskID == "" { + return + } + t.mu.Lock() + m, ok := t.data[service] + if !ok { + m = make(map[string]struct{}) + t.data[service] = m + } + m[taskID] = struct{}{} + t.mu.Unlock() +} + +// End removes a running task under a given service. Empty arguments +// are ignored. Removing a non-existent (service, taskID) pair is a no-op. +func (t *InMemoryTracker) End(service, taskID string) { + if service == "" || taskID == "" { + return + } + t.mu.Lock() + if m, ok := t.data[service]; ok { + delete(m, taskID) + if len(m) == 0 { + delete(t.data, service) + } + } + t.mu.Unlock() +} + +// Snapshot returns a copy of the current running tasks per service. +// The returned map and slices are independent of internal state. +func (t *InMemoryTracker) Snapshot() map[string][]string { + out := make(map[string][]string) + t.mu.RLock() + for svc, m := range t.data { + ids := make([]string, 0, len(m)) + for id := range m { + ids = append(ids, id) + } + out[svc] = ids + } + t.mu.RUnlock() + return out +} diff --git a/pkg/task/task_test.go b/pkg/task/task_test.go new file mode 100644 index 00000000..624b900f --- /dev/null +++ b/pkg/task/task_test.go @@ -0,0 +1,161 @@ +package task + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestStartEndSnapshot(t *testing.T) { + tr := New() + + // Initially empty + if snap := tr.Snapshot(); len(snap) != 0 { + t.Fatalf("expected empty snapshot, got %#v", snap) + } + + // Start two tasks under same service + tr.Start("svc", "id1") + tr.Start("svc", "id2") + + snap := tr.Snapshot() + ids, ok := snap["svc"] + if !ok { + t.Fatalf("expected service 'svc' in snapshot") + } + if len(ids) != 2 { + t.Fatalf("expected 2 ids, got %d (%v)", len(ids), ids) + } + + // End one task + tr.End("svc", "id1") + snap = tr.Snapshot() + ids = snap["svc"] + if len(ids) != 1 { + t.Fatalf("expected 1 id, got %d (%v)", len(ids), ids) + } + if ids[0] != "id2" && ids[0] != "id1" { // order not guaranteed; check that id2 remains by set membership + // Build a small set for clarity + m := map[string]struct{}{} + for _, v := range ids { + m[v] = struct{}{} + } + if _, ok := m["id2"]; !ok { + t.Fatalf("expected id2 to remain, got %v", ids) + } + } + + // End last task + tr.End("svc", "id2") + snap = tr.Snapshot() + if _, ok := snap["svc"]; ok { + t.Fatalf("expected service removed after last task ended, got %v", snap) + } +} + +func TestInvalidInputsAndIsolation(t *testing.T) { + tr := New() + + // Invalid inputs should be ignored + tr.Start("", "id") + tr.Start("svc", "") + tr.End("", "id") + tr.End("svc", "") + if snap := tr.Snapshot(); len(snap) != 0 { + t.Fatalf("expected empty snapshot for invalid inputs, got %#v", snap) + } + + // Snapshot must be a copy + tr.Start("svc", "id") + snap := tr.Snapshot() + // mutate snapshot map and slice + delete(snap, "svc") + snap2 := tr.Snapshot() + if _, ok := snap2["svc"]; !ok { + t.Fatalf("mutating snapshot should not affect tracker state") + } +} + +// TestConcurrentAccessNoPanic ensures that concurrent Start/End/Snapshot +// operations do not panic due to unsafe map access. +func TestConcurrentAccessNoPanic(t *testing.T) { + tr := New() + + // Run a mix of writers and readers concurrently. + var wg sync.WaitGroup + startWriters := 8 + snapReaders := 4 + loops := 1000 + + // Writers: repeatedly start/end tasks across a few services. + for w := 0; w < startWriters; w++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for i := 0; i < loops; i++ { + svc := "svc" + string('A'+rune(id%3)) // svcA, svcB, svcC + tid := svc + ":t" + fmtInt(i%5) + tr.Start(svc, tid) + if i%2 == 0 { + tr.End(svc, tid) + } + } + }(w) + } + + // Readers: take snapshots concurrently. + for r := 0; r < snapReaders; r++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < loops; i++ { + _ = tr.Snapshot() + } + }() + } + + // If there is any concurrent map access bug, the test runner would panic. + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + select { + case <-done: + // ok + case <-time.After(5 * time.Second): + t.Fatal("concurrent access test timed out") + } +} + +// fmtInt provides a tiny int-to-string helper to avoid importing strconv. +func fmtInt(i int) string { return string('0' + rune(i)) } + +func TestHandleIdempotentAndWatchdog(t *testing.T) { + // Swap the default tracker to isolate + orig := Default + Default = New() + defer func() { Default = orig }() + + ctx := context.Background() + + // Idempotent End + g := Start(ctx, "svc.guard", "id-1", 0) + g.End(ctx) + g.End(ctx) // no panic, no double-end crash + + // Watchdog auto-end: use a small timeout + g2 := Start(ctx, "svc.guard", "id-2", 50*time.Millisecond) + _ = g2 // ensure guard stays referenced until timeout path + // Do not call End; let the watchdog fire + time.Sleep(120 * time.Millisecond) + + // After watchdog, the task should not be listed + snap := Default.Snapshot() + if ids, ok := snap["svc.guard"]; ok { + // If still present, ensure id-2 is not in the list + for _, id := range ids { + if id == "id-2" { + t.Fatalf("expected watchdog to remove id-2 from svc.guard; snapshot: %v", ids) + } + } + } +} diff --git a/proto/supernode/status.proto b/proto/supernode/status.proto index c9edc5e7..d944d614 100644 --- a/proto/supernode/status.proto +++ b/proto/supernode/status.proto @@ -42,6 +42,13 @@ message StatusResponse { string hardware_summary = 4; // Formatted hardware summary (e.g., "8 cores / 32GB RAM") } + // ServiceTasks contains task information for a specific service + message ServiceTasks { + string service_name = 1; + repeated string task_ids = 2; + int32 task_count = 3; + } + // Network information message Network { int32 peers_count = 1; // Number of connected peers in P2P network @@ -49,6 +56,7 @@ message StatusResponse { } Resources resources = 3; + repeated ServiceTasks running_tasks = 4; // Services with currently running tasks repeated string registered_services = 5; // All registered/available services Network network = 6; // P2P network information int32 rank = 7; // Rank in the top supernodes list (0 if not in top list) diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index c13b94a1..e7de4992 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -46,6 +46,8 @@ func (t *CascadeTask) Run(ctx context.Context) error { return err } + // Deterministic per-action ordering to distribute load fairly + supernodes = orderSupernodesByDeterministicDistance(t.ActionID, supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes found.", event.EventData{event.KeyCount: len(supernodes)}) // 2 - Register with the supernodes @@ -80,6 +82,11 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum event.KeySupernodeAddress: sn.CosmosAddress, event.KeyIteration: idx + 1, }) + // Re-check serving status just-in-time to avoid calling a node that became busy/down + if !t.isServing(ctx, sn) { + t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", idx+1) + continue + } if err := t.attemptRegistration(ctx, idx, sn, clientFactory, req); err != nil { // t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{ diff --git a/sdk/task/download.go b/sdk/task/download.go index 2c727ae9..4adecf21 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -43,6 +43,8 @@ func (t *CascadeDownloadTask) Run(ctx context.Context) error { t.LogEvent(ctx, event.SDKTaskFailed, "task failed", event.EventData{event.KeyError: err.Error()}) return err } + // Deterministic per-action ordering to distribute load fairly + supernodes = orderSupernodesByDeterministicDistance(t.ActionID, supernodes) t.LogEvent(ctx, event.SDKSupernodesFound, "super-nodes found", event.EventData{event.KeyCount: len(supernodes)}) // 2 – download from super-nodes @@ -88,6 +90,12 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern event.KeyIteration: iteration, }) + // Re-check serving status just-in-time to avoid calling a node that became busy/down + if !t.isServing(ctx, sn) { + t.logger.Info(ctx, "skip supernode: not serving", "supernode", sn.GrpcEndpoint, "sn-address", sn.CosmosAddress, "iteration", iteration) + continue + } + if err := t.attemptDownload(ctx, sn, clientFactory, req); err != nil { // Log failure and continue to next supernode t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ @@ -116,6 +124,16 @@ func (t *CascadeDownloadTask) attemptDownload( factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeDownloadRequest, ) error { + // Recheck liveness/busyness just before attempting download to handle delays + if !t.isServing(parent, sn) { + // Emit a concise event; detailed rejection reasons are logged inside isServing + t.LogEvent(parent, event.SDKDownloadFailure, "precheck: supernode not serving/busy", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyReason: "precheck_not_serving_or_busy", + }) + return fmt.Errorf("precheck: supernode not serving/busy") + } ctx, cancel := context.WithTimeout(parent, downloadTimeout) defer cancel() diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go index 2ea8bcaa..2d2b7391 100644 --- a/sdk/task/helpers.go +++ b/sdk/task/helpers.go @@ -4,10 +4,13 @@ import ( "context" "encoding/base64" "fmt" + "math/big" "os" "path/filepath" + "sort" "strings" + "github.com/LumeraProtocol/supernode/v2/pkg/cascadekit" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" ) @@ -136,3 +139,47 @@ func ensureOutputPathWithFilename(outputPath, filename string) string { // Otherwise, append the filename to the path return filepath.Join(outputPath, filename) } + +func orderSupernodesByDeterministicDistance(seed string, sns lumera.Supernodes) lumera.Supernodes { + if len(sns) == 0 || seed == "" { + return sns + } + // Precompute seed hash (blake3) + seedHash, err := cascadekit.ComputeBlake3Hash([]byte(seed)) + if err != nil { + return sns + } + + type nodeDist struct { + sn lumera.Supernode + distance *big.Int + } + nd := make([]nodeDist, 0, len(sns)) + for _, sn := range sns { + id := sn.CosmosAddress + if id == "" { + id = sn.GrpcEndpoint + } + nHash, err := cascadekit.ComputeBlake3Hash([]byte(id)) + if err != nil { + nd = append(nd, nodeDist{sn: sn, distance: new(big.Int).SetInt64(0)}) + continue + } + // XOR distance across min length + l := len(seedHash) + if len(nHash) < l { + l = len(nHash) + } + xor := make([]byte, l) + for i := 0; i < l; i++ { + xor[i] = seedHash[i] ^ nHash[i] + } + nd = append(nd, nodeDist{sn: sn, distance: new(big.Int).SetBytes(xor)}) + } + sort.Slice(nd, func(i, j int) bool { return nd[i].distance.Cmp(nd[j].distance) < 0 }) + out := make(lumera.Supernodes, len(nd)) + for i := range nd { + out[i] = nd[i].sn + } + return out +} diff --git a/sdk/task/task.go b/sdk/task/task.go index bb402975..605819f2 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -8,7 +8,6 @@ import ( sdkmath "cosmossdk.io/math" "github.com/LumeraProtocol/supernode/v2/pkg/errgroup" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" txmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/v2/sdk/config" @@ -124,7 +123,7 @@ func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { PeerType: t.config.Account.PeerType, }).CreateClient(ctx, sn) if err != nil { - logtrace.Debug(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"}) + t.logger.Info(ctx, "reject supernode: client create failed", "reason", err.Error(), "endpoint", sn.GrpcEndpoint, "cosmos", sn.CosmosAddress) return false } defer client.Close(ctx) @@ -132,26 +131,41 @@ func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { // First check gRPC health resp, err := client.HealthCheck(ctx) if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + statusStr := "nil" + if resp != nil { + statusStr = resp.Status.String() + } + t.logger.Info(ctx, "reject supernode: health not SERVING", "error", err, "status", statusStr) return false } // Then check P2P peers count via status status, err := client.GetSupernodeStatus(ctx) if err != nil { + t.logger.Info(ctx, "reject supernode: status fetch failed", "error", err) return false } if status.Network.PeersCount <= 1 { + t.logger.Info(ctx, "reject supernode: insufficient peers", "peers_count", status.Network.PeersCount) + return false + } + // Busy check: exclude supernodes that report running tasks + if rt := status.GetRunningTasks(); len(rt) > 0 { + svc := rt[0].GetServiceName() + t.logger.Info(ctx, "reject supernode: busy", "service", svc) return false } denom := txmod.DefaultFeeDenom // base denom (micro), e.g., "ulume" bal, err := t.client.GetBalance(ctx, sn.CosmosAddress, denom) if err != nil || bal == nil || bal.Balance == nil { + t.logger.Info(ctx, "reject supernode: balance fetch failed or empty", "error", err) return false } // Require at least 1 LUME = 10^6 micro (ulume) min := sdkmath.NewInt(1_000_000) if bal.Balance.Amount.LT(min) { + t.logger.Info(ctx, "reject supernode: insufficient balance", "amount", bal.Balance.Amount.String(), "min", min.String()) return false } diff --git a/sn-manager/internal/updater/updater.go b/sn-manager/internal/updater/updater.go index 548af07b..3fe4fd3f 100644 --- a/sn-manager/internal/updater/updater.go +++ b/sn-manager/internal/updater/updater.go @@ -3,28 +3,32 @@ package updater import ( "context" "fmt" + "io" "log" + "net/http" "os" "path/filepath" "strings" "time" + pb "github.com/LumeraProtocol/supernode/v2/gen/supernode" "github.com/LumeraProtocol/supernode/v2/sn-manager/internal/config" "github.com/LumeraProtocol/supernode/v2/sn-manager/internal/github" "github.com/LumeraProtocol/supernode/v2/sn-manager/internal/utils" "github.com/LumeraProtocol/supernode/v2/sn-manager/internal/version" "github.com/LumeraProtocol/supernode/v2/supernode/transport/gateway" + "google.golang.org/protobuf/encoding/protojson" ) // Global updater timing constants const ( // gatewayTimeout bounds the local gateway status probe - // gatewayTimeout = 15 * time.Second + gatewayTimeout = 15 * time.Second // updateCheckInterval is how often the periodic updater runs - updateCheckInterval = 5 * time.Minute + updateCheckInterval = 10 * time.Minute // forceUpdateAfter is the age threshold after a release is published // beyond which updates are applied regardless of normal gates (idle, policy) - forceUpdateAfter = 5 * time.Minute + forceUpdateAfter = 60 * time.Minute ) type AutoUpdater struct { @@ -132,37 +136,43 @@ func (u *AutoUpdater) ShouldUpdate(current, latest string) bool { return false } -// isGatewayIdle returns (idle, isError). When isError is true, -// the gateway could not be reliably checked (network/error/invalid). -// When isError is false and idle is false, the gateway is busy. func (u *AutoUpdater) isGatewayIdle() (bool, bool) { - // client := &http.Client{Timeout: gatewayTimeout} - - // resp, err := client.Get(u.gatewayURL) - // if err != nil { - // log.Printf("Failed to check gateway status: %v", err) - // // Error contacting gateway - // return false, true - // } - // defer resp.Body.Close() - - // if resp.StatusCode != http.StatusOK { - // log.Printf("Gateway returned status %d, not safe to update", resp.StatusCode) - // return false, true - // } - - // var status pb.StatusResponse - // body, err := io.ReadAll(resp.Body) - // if err != nil { - // log.Printf("Failed to read gateway response: %v", err) - // return false, true - // } - // if err := protojson.Unmarshal(body, &status); err != nil { - // log.Printf("Failed to decode gateway response: %v", err) - // return false, true - // } - - // // TEMP: tasks are not available in the new gateway endpoint; skip busy-check + client := &http.Client{Timeout: gatewayTimeout} + + resp, err := client.Get(u.gatewayURL) + if err != nil { + log.Printf("Failed to check gateway status: %v", err) + // Error contacting gateway + return false, true + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("Gateway returned status %d, not safe to update", resp.StatusCode) + return false, true + } + + var status pb.StatusResponse + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Printf("Failed to read gateway response: %v", err) + return false, true + } + if err := protojson.Unmarshal(body, &status); err != nil { + log.Printf("Failed to decode gateway response: %v", err) + return false, true + } + + // Idle when there are no running tasks across all services + if len(status.GetRunningTasks()) == 0 { + return true, false + } + for _, st := range status.GetRunningTasks() { + if st.GetTaskCount() > 0 || len(st.GetTaskIds()) > 0 { + log.Printf("Gateway busy: service=%s tasks=%d", st.GetServiceName(), st.GetTaskCount()) + return false, false + } + } return true, false } diff --git a/supernode/status/service.go b/supernode/status/service.go index 553f7e2d..5c9c4cd6 100644 --- a/supernode/status/service.go +++ b/supernode/status/service.go @@ -10,6 +10,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" + "github.com/LumeraProtocol/supernode/v2/pkg/task" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/LumeraProtocol/supernode/v2/supernode/config" ) @@ -98,6 +99,17 @@ func (s *SupernodeStatusService) GetStatus(ctx context.Context, includeP2PMetric resp.Network.PeersCount = 0 resp.Network.PeerAddresses = []string{} + // Populate running tasks from the global in-memory tracker + if snap := task.Default.Snapshot(); len(snap) > 0 { + for svc, ids := range snap { + resp.RunningTasks = append(resp.RunningTasks, &pb.StatusResponse_ServiceTasks{ + ServiceName: svc, + TaskIds: ids, + TaskCount: int32(len(ids)), + }) + } + } + // Prepare optional P2P metrics container pm := &pb.StatusResponse_P2PMetrics{ DhtMetrics: &pb.StatusResponse_P2PMetrics_DhtMetrics{}, diff --git a/supernode/transport/grpc/cascade/handler.go b/supernode/transport/grpc/cascade/handler.go index 2a361a0f..e99477ac 100644 --- a/supernode/transport/grpc/cascade/handler.go +++ b/supernode/transport/grpc/cascade/handler.go @@ -7,10 +7,12 @@ import ( "io" "os" "path/filepath" + "time" pb "github.com/LumeraProtocol/supernode/v2/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/v2/pkg/errors" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + tasks "github.com/LumeraProtocol/supernode/v2/pkg/task" cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade" "lukechampine.com/blake3" ) @@ -27,6 +29,12 @@ func NewCascadeActionServer(factory cascadeService.CascadeServiceFactory) *Actio // calculateOptimalChunkSize returns an optimal chunk size based on file size // to balance throughput and memory usage + + var ( + startedTask bool + handle *tasks.Handle + ) + func calculateOptimalChunkSize(fileSize int64) int { const ( minChunkSize = 64 * 1024 // 64 KB minimum @@ -124,6 +132,12 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er case *pb.RegisterRequest_Metadata: metadata = x.Metadata logtrace.Debug(ctx, "received metadata", logtrace.Fields{"task_id": metadata.TaskId, "action_id": metadata.ActionId}) + // Start live task tracking on first metadata (covers remaining stream and processing) + if !startedTask { + startedTask = true + handle = tasks.Start(ctx, "cascade.upload", metadata.ActionId, 30*time.Minute) + defer handle.End(ctx) + } } } @@ -189,6 +203,10 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS } logtrace.Debug(ctx, "download request received", fields) + // Start live task tracking for the entire download RPC (including file streaming) + dlHandle := tasks.Start(ctx, "cascade.download", req.GetActionId(), 30*time.Minute) + defer dlHandle.End(ctx) + // Prepare to capture decoded file path from task events var decodedFilePath string var tmpDir string