From adbdbdb7e195267da3fe4a1180dd0ae6bd2b7e38 Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sun, 27 Jul 2025 01:28:40 +0200 Subject: [PATCH 01/19] logging # connections --- Dockerfile.amd64 => meta/Dockerfile.amd64 | 2 +- meta/build_core_docker.sh | 1 + meta/build_docker.sh | 0 src/internal/protocol/clock.go | 17 +++++ src/internal/protocol/host.go | 78 ++++++++++++++++++++++- src/internal/server/crdt_handler.go | 17 +++++ src/internal/server/proxy_handler.go | 2 +- src/internal/server/server.go | 3 +- 8 files changed, 116 insertions(+), 4 deletions(-) rename Dockerfile.amd64 => meta/Dockerfile.amd64 (92%) create mode 100644 meta/build_core_docker.sh delete mode 100644 meta/build_docker.sh diff --git a/Dockerfile.amd64 b/meta/Dockerfile.amd64 similarity index 92% rename from Dockerfile.amd64 rename to meta/Dockerfile.amd64 index 9802057..755095e 100644 --- a/Dockerfile.amd64 +++ b/meta/Dockerfile.amd64 @@ -2,7 +2,7 @@ FROM golang:1.23 AS build WORKDIR /app -COPY . /app +COPY src/ /app/src RUN cd /app/src/ && make build-release FROM alpine:edge diff --git a/meta/build_core_docker.sh b/meta/build_core_docker.sh new file mode 100644 index 0000000..d8de63b --- /dev/null +++ b/meta/build_core_docker.sh @@ -0,0 +1 @@ +docker build -f meta/Dockerfile.amd64 -t ghcr.io/xiaozheyao/ocf:amd64-dev . && docker push ghcr.io/xiaozheyao/ocf:amd64-dev \ No newline at end of file diff --git a/meta/build_docker.sh b/meta/build_docker.sh deleted file mode 100644 index e69de29..0000000 diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 0277407..f688bbf 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -43,5 +43,22 @@ func StartTicker() { } }) common.ReportError(err, "Error while creating cleaning ticker") + + // Add resource monitoring every 2 minutes + err = gocron.Every(2).Minutes().Do(func() { + GetResourceManagerStats() + + // Also log current connection count for easy monitoring + connectedPeers := ConnectedPeers() + allPeers := AllPeers() + common.Logger.Infof("Connection Summary: %d connected peers, %d total known peers", + len(connectedPeers), len(allPeers)) + + // Log if we have very few connections (potential issue) + if len(connectedPeers) < 3 { + common.Logger.Warnf("Low connection count detected: only %d connected peers", len(connectedPeers)) + } + }) + common.ReportError(err, "Error while creating resource monitoring ticker") <-gocron.Start() } diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index 0888874..cbf903e 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -19,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/security/noise" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "github.com/spf13/viper" @@ -83,9 +84,42 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, if err != nil { return nil, err } + + // Configure resource manager with higher limits + limits := rcmgr.DefaultLimits.AutoScale() + + // Increase connection limits significantly for a distributed system + systemLimits := rcmgr.ResourceLimits{ + ConnsInbound: 1000, // Allow up to 1000 inbound connections + ConnsOutbound: 1000, // Allow up to 1000 outbound connections + Conns: 2000, // Allow up to 2000 total connections + StreamsInbound: 10000, // Increase stream limits + StreamsOutbound: 10000, + Streams: 20000, + Memory: 1 << 30, // 1GB memory limit + } + + // Apply the custom limits + finalLimits := rcmgr.PartialLimitConfig{ + System: systemLimits, + // Keep default peer limits but increase them slightly + PeerDefault: rcmgr.ResourceLimits{ + ConnsInbound: 16, // Allow more connections per peer + ConnsOutbound: 16, + Conns: 32, + }, + }.Build(limits) + + // Create resource manager + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(finalLimits)) + if err != nil { + return nil, err + } + opts := []libp2p.Option{ libp2p.DefaultTransports, libp2p.Identity(priv), + libp2p.ResourceManager(mgr), // Use our custom resource manager // libp2p.ConnectionManager(connmgr), libp2p.NATPortMap(), libp2p.ListenAddrStrings( @@ -104,7 +138,23 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, return ddht, err }), } - return libp2p.New(opts...) + + host, err := libp2p.New(opts...) + if err != nil { + return nil, err + } + + // Log connection events for debugging + host.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(n network.Network, c network.Conn) { + common.Logger.Info("Connected to peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) + }, + DisconnectedF: func(n network.Network, c network.Conn) { + common.Logger.Info("Disconnected from peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) + }, + }) + + return host, nil } func newDHT(ctx context.Context, h host.Host, ds datastore.Batching) (*dualdht.DHT, error) { @@ -163,3 +213,29 @@ func ConnectedBootstraps() []string { } return bootstraps } + +// GetResourceManagerStats returns current resource usage statistics +func GetResourceManagerStats() { + host, _ := GetP2PNode(nil) + if rm := host.Network().ResourceManager(); rm != nil { + // Try to get stats if available + if statsGetter, ok := rm.(interface { + Stat() rcmgr.ResourceManagerStat + }); ok { + stats := statsGetter.Stat() + common.Logger.Infof("Resource Manager Stats - System: Conns=%d (in:%d out:%d), Streams=%d (in:%d out:%d), Memory=%d", + stats.System.NumConnsInbound+stats.System.NumConnsOutbound, + stats.System.NumConnsInbound, + stats.System.NumConnsOutbound, + stats.System.NumStreamsInbound+stats.System.NumStreamsOutbound, + stats.System.NumStreamsInbound, + stats.System.NumStreamsOutbound, + stats.System.Memory, + ) + } else { + common.Logger.Info("Resource Manager present but stats not available") + } + } else { + common.Logger.Info("No Resource Manager configured") + } +} diff --git a/src/internal/server/crdt_handler.go b/src/internal/server/crdt_handler.go index 34b3a4f..46ea5b9 100644 --- a/src/internal/server/crdt_handler.go +++ b/src/internal/server/crdt_handler.go @@ -25,6 +25,23 @@ func listBootstraps(c *gin.Context) { c.JSON(200, gin.H{"bootstraps": addrs}) } +func getResourceStats(c *gin.Context) { + // Call the resource manager stats function from protocol package + protocol.GetResourceManagerStats() + + // Also return current connection count + connectedPeers := protocol.ConnectedPeers() + allPeers := protocol.AllPeers() + + c.JSON(200, gin.H{ + "connected_peers": len(connectedPeers), + "total_peers_known": len(allPeers), + "connected_peer_details": connectedPeers, + "all_peer_details": allPeers, + "message": "Resource manager stats logged to console", + }) +} + func updateLocal(c *gin.Context) { var peer protocol.Peer c.BindJSON(&peer) diff --git a/src/internal/server/proxy_handler.go b/src/internal/server/proxy_handler.go index a93ddb7..7a542cd 100644 --- a/src/internal/server/proxy_handler.go +++ b/src/internal/server/proxy_handler.go @@ -54,7 +54,7 @@ func P2PForwardHandler(c *gin.Context) { Host: requestPeer, Path: requestPath, } - common.Logger.Info("Forwarding request to %s", target.String()) + common.Logger.Infof("Forwarding request to %s", target.String()) director := func(req *http.Request) { req.URL.Scheme = target.Scheme req.URL.Path = target.Path diff --git a/src/internal/server/server.go b/src/internal/server/server.go index c4e6bbe..6e77fbf 100644 --- a/src/internal/server/server.go +++ b/src/internal/server/server.go @@ -47,6 +47,7 @@ func StartServer() { crdtGroup.GET("/peers", listPeers) crdtGroup.GET("/peers_status", listPeersWithStatus) crdtGroup.GET("/bootstraps", listBootstraps) + crdtGroup.GET("/stats", getResourceStats) // Add resource manager stats endpoint crdtGroup.POST("/_node", updateLocal) crdtGroup.DELETE("/_node", deleteLocal) } @@ -77,7 +78,7 @@ func StartServer() { go func() { err := http.Serve(p2plistener, r) if err != nil { - common.Logger.Error("http.Serve: %s", err) + common.Logger.Errorf("http.Serve: %s", err) } }() go func() { From 94feafaed317e012cdae6ec36982c34b9ed1106e Mon Sep 17 00:00:00 2001 From: Elia Palme <2025999+theely@users.noreply.github.com> Date: Sat, 9 Aug 2025 05:54:47 +0200 Subject: [PATCH 02/19] Handles node disconnect (#8) * save connection status * newly added nodes are connected by default * clean up unused reachableOnly option * fix critical bug in get peer --- src/internal/protocol/clock.go | 17 +++++++++--- src/internal/protocol/crdt.go | 12 ++++++++- src/internal/protocol/host.go | 2 +- src/internal/protocol/node_table.go | 40 ++++++++++------------------- src/internal/server/crdt_handler.go | 3 ++- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 0277407..41a8f89 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -1,6 +1,7 @@ package protocol import ( + "encoding/json" "math/rand" "ocf/internal/common" "ocf/internal/common/process" @@ -29,11 +30,19 @@ func StartTicker() { host, _ := GetP2PNode(nil) peers := host.Peerstore().Peers() // updateMyself() - for _, peer := range peers { + for _, peer_id := range peers { // check if peer is still connected - if peer != host.ID() && host.Network().Connectedness(peer) != network.Connected { - // delete peer from table - DeleteNodeTableHook(ds.NewKey(peer.String())) + peer, error := GetPeerFromTable(peer_id.String()) + if error == nil { + peer.Connected = true + if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected { + common.Logger.Info("Peer:" + peer_id.String() + " got disconnected!") + peer.Connected = false + } + value, err := json.Marshal(peer) + if err == nil { + UpdateNodeTableHook(ds.NewKey(peer_id.String()), value) + } } } if !process.HealthCheck() { diff --git a/src/internal/protocol/crdt.go b/src/internal/protocol/crdt.go index 05f93ea..ca995c6 100644 --- a/src/internal/protocol/crdt.go +++ b/src/internal/protocol/crdt.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "encoding/json" "fmt" "ocf/internal/common" "sync" @@ -79,7 +80,16 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) { opts.RebroadcastInterval = 5 * time.Second opts.PutHook = func(k ds.Key, v []byte) { fmt.Printf("Added: [%s] -> %s\n", k, string(v)) - UpdateNodeTableHook(k, v) + var peer Peer + err := json.Unmarshal(v, &peer) + common.ReportError(err, "Error while unmarshalling peer") + peer.Connected = true + value, err := json.Marshal(peer) + if err == nil { + UpdateNodeTableHook(k, value) + } else { + common.Logger.Error("Error while marshalling peer", err) + } } opts.DeleteHook = func(k ds.Key) { fmt.Printf("Removed: [%s]\n", k) diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index 0888874..a6aa044 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -150,7 +150,7 @@ func AllPeers() []*PeerWithStatus { func ConnectedBootstraps() []string { var bootstraps = []string{} - dnt := GetNodeTable(false) + dnt := GetNodeTable() host, _ := GetP2PNode(nil) for _, p := range *dnt { if p.PublicAddress != "" { diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index d445ab0..25a480a 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -9,8 +9,6 @@ import ( "sync" ds "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/viper" ) @@ -49,6 +47,8 @@ type Peer struct { Version string `json:"version"` PublicAddress string `json:"public_address"` Hardware common.HardwareSpec `json:"hardware"` + Connected bool `json:"connected"` + Load []int `json:"load"` } type PeerWithStatus struct { @@ -61,26 +61,10 @@ type NodeTable map[string]Peer var dnt *NodeTable -func GetNodeTable(reachableOnly bool) *NodeTable { +func GetNodeTable() *NodeTable { dntOnce.Do(func() { dnt = &NodeTable{} }) - if reachableOnly { - host, _ := GetP2PNode(nil) - // filter out the nodes that are not connected - for _, p := range *dnt { - if host.Network().Connectedness(peer.ID(p.ID)) != network.Connected && p.ID != host.ID().String() { - // try to dial the peer - conn, err := host.Network().DialPeer(context.Background(), peer.ID(p.ID)) - if err != nil { - common.Logger.Info("Peer: ", p.ID, " removed from table: ", err) - // delete(*dnt, key) - } else { - defer conn.Close() - } - } - } - } return dnt } @@ -133,7 +117,7 @@ func DeleteNodeTable() { } func UpdateNodeTableHook(key ds.Key, value []byte) { - table := *GetNodeTable(false) + table := *GetNodeTable() var peer Peer err := json.Unmarshal(value, &peer) common.ReportError(err, "Error while unmarshalling peer") @@ -141,13 +125,13 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { } func DeleteNodeTableHook(key ds.Key) { - table := *GetNodeTable(false) + table := *GetNodeTable() delete(table, key.String()) } func GetPeerFromTable(peerId string) (Peer, error) { - table := *GetNodeTable(false) - peer, ok := table[peerId] + table := *GetNodeTable() + peer, ok := table["/"+peerId] if !ok { return Peer{}, errors.New("peer not found") } @@ -173,11 +157,13 @@ func GetService(name string) (Service, error) { func GetAllProviders(serviceName string) ([]Peer, error) { var providers []Peer - table := *GetNodeTable(false) + table := *GetNodeTable() for _, peer := range table { - for _, service := range peer.Service { - if service.Name == serviceName { - providers = append(providers, peer) + if peer.Connected { + for _, service := range peer.Service { + if service.Name == serviceName { + providers = append(providers, peer) + } } } } diff --git a/src/internal/server/crdt_handler.go b/src/internal/server/crdt_handler.go index 34b3a4f..865795e 100644 --- a/src/internal/server/crdt_handler.go +++ b/src/internal/server/crdt_handler.go @@ -28,6 +28,7 @@ func listBootstraps(c *gin.Context) { func updateLocal(c *gin.Context) { var peer protocol.Peer c.BindJSON(&peer) + peer.Connected = true protocol.UpdateNodeTable(peer) } @@ -42,5 +43,5 @@ func getDNT(c *gin.Context) { {ingest.TimestampField: time.Now(), "event": "DNT Lookup"}, } IngestEvents(events) - c.JSON(200, protocol.GetNodeTable(false)) + c.JSON(200, protocol.GetNodeTable()) } From 334b2c6167878c2c2d127294987fd96e08cc30e0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 8 Aug 2025 23:57:02 -0400 Subject: [PATCH 03/19] Bump next from 15.1.7 to 15.2.4 in /apps/web (#4) Bumps [next](https://github.com/vercel/next.js) from 15.1.7 to 15.2.4. - [Release notes](https://github.com/vercel/next.js/releases) - [Changelog](https://github.com/vercel/next.js/blob/canary/release.js) - [Commits](https://github.com/vercel/next.js/compare/v15.1.7...v15.2.4) --- updated-dependencies: - dependency-name: next dependency-version: 15.2.4 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- apps/web/package-lock.json | 80 +++++++++++++++++++------------------- apps/web/package.json | 2 +- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/apps/web/package-lock.json b/apps/web/package-lock.json index 9ba3bd3..0213a80 100644 --- a/apps/web/package-lock.json +++ b/apps/web/package-lock.json @@ -19,7 +19,7 @@ "clsx": "^2.1.1", "embla-carousel-react": "^8.5.2", "lucide-react": "^0.475.0", - "next": "15.1.7", + "next": "15.2.4", "next-themes": "^0.4.4", "react": "^19.0.0", "react-dom": "^19.0.0", @@ -717,9 +717,9 @@ } }, "node_modules/@next/env": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/env/-/env-15.1.7.tgz", - "integrity": "sha512-d9jnRrkuOH7Mhi+LHav2XW91HOgTAWHxjMPkXMGBc9B2b7614P7kjt8tAplRvJpbSt4nbO1lugcT/kAaWzjlLQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/env/-/env-15.2.4.tgz", + "integrity": "sha512-+SFtMgoiYP3WoSswuNmxJOCwi06TdWE733D+WPjpXIe4LXGULwEaofiiAy6kbS0+XjM5xF5n3lKuBwN2SnqD9g==", "license": "MIT" }, "node_modules/@next/eslint-plugin-next": { @@ -733,9 +733,9 @@ } }, "node_modules/@next/swc-darwin-arm64": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-darwin-arm64/-/swc-darwin-arm64-15.1.7.tgz", - "integrity": "sha512-hPFwzPJDpA8FGj7IKV3Yf1web3oz2YsR8du4amKw8d+jAOHfYHYFpMkoF6vgSY4W6vB29RtZEklK9ayinGiCmQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-darwin-arm64/-/swc-darwin-arm64-15.2.4.tgz", + "integrity": "sha512-1AnMfs655ipJEDC/FHkSr0r3lXBgpqKo4K1kiwfUf3iE68rDFXZ1TtHdMvf7D0hMItgDZ7Vuq3JgNMbt/+3bYw==", "cpu": [ "arm64" ], @@ -749,9 +749,9 @@ } }, "node_modules/@next/swc-darwin-x64": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-15.1.7.tgz", - "integrity": "sha512-2qoas+fO3OQKkU0PBUfwTiw/EYpN+kdAx62cePRyY1LqKtP09Vp5UcUntfZYajop5fDFTjSxCHfZVRxzi+9FYQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-darwin-x64/-/swc-darwin-x64-15.2.4.tgz", + "integrity": "sha512-3qK2zb5EwCwxnO2HeO+TRqCubeI/NgCe+kL5dTJlPldV/uwCnUgC7VbEzgmxbfrkbjehL4H9BPztWOEtsoMwew==", "cpu": [ "x64" ], @@ -765,9 +765,9 @@ } }, "node_modules/@next/swc-linux-arm64-gnu": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-15.1.7.tgz", - "integrity": "sha512-sKLLwDX709mPdzxMnRIXLIT9zaX2w0GUlkLYQnKGoXeWUhcvpCrK+yevcwCJPdTdxZEUA0mOXGLdPsGkudGdnA==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-gnu/-/swc-linux-arm64-gnu-15.2.4.tgz", + "integrity": "sha512-HFN6GKUcrTWvem8AZN7tT95zPb0GUGv9v0d0iyuTb303vbXkkbHDp/DxufB04jNVD+IN9yHy7y/6Mqq0h0YVaQ==", "cpu": [ "arm64" ], @@ -781,9 +781,9 @@ } }, "node_modules/@next/swc-linux-arm64-musl": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-15.1.7.tgz", - "integrity": "sha512-zblK1OQbQWdC8fxdX4fpsHDw+VSpBPGEUX4PhSE9hkaWPrWoeIJn+baX53vbsbDRaDKd7bBNcXRovY1hEhFd7w==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-linux-arm64-musl/-/swc-linux-arm64-musl-15.2.4.tgz", + "integrity": "sha512-Oioa0SORWLwi35/kVB8aCk5Uq+5/ZIumMK1kJV+jSdazFm2NzPDztsefzdmzzpx5oGCJ6FkUC7vkaUseNTStNA==", "cpu": [ "arm64" ], @@ -797,9 +797,9 @@ } }, "node_modules/@next/swc-linux-x64-gnu": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-15.1.7.tgz", - "integrity": "sha512-GOzXutxuLvLHFDAPsMP2zDBMl1vfUHHpdNpFGhxu90jEzH6nNIgmtw/s1MDwpTOiM+MT5V8+I1hmVFeAUhkbgQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-gnu/-/swc-linux-x64-gnu-15.2.4.tgz", + "integrity": "sha512-yb5WTRaHdkgOqFOZiu6rHV1fAEK0flVpaIN2HB6kxHVSy/dIajWbThS7qON3W9/SNOH2JWkVCyulgGYekMePuw==", "cpu": [ "x64" ], @@ -813,9 +813,9 @@ } }, "node_modules/@next/swc-linux-x64-musl": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-15.1.7.tgz", - "integrity": "sha512-WrZ7jBhR7ATW1z5iEQ0ZJfE2twCNSXbpCSaAunF3BKcVeHFADSI/AW1y5Xt3DzTqPF1FzQlwQTewqetAABhZRQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-linux-x64-musl/-/swc-linux-x64-musl-15.2.4.tgz", + "integrity": "sha512-Dcdv/ix6srhkM25fgXiyOieFUkz+fOYkHlydWCtB0xMST6X9XYI3yPDKBZt1xuhOytONsIFJFB08xXYsxUwJLw==", "cpu": [ "x64" ], @@ -829,9 +829,9 @@ } }, "node_modules/@next/swc-win32-arm64-msvc": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-15.1.7.tgz", - "integrity": "sha512-LDnj1f3OVbou1BqvvXVqouJZKcwq++mV2F+oFHptToZtScIEnhNRJAhJzqAtTE2dB31qDYL45xJwrc+bLeKM2Q==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-win32-arm64-msvc/-/swc-win32-arm64-msvc-15.2.4.tgz", + "integrity": "sha512-dW0i7eukvDxtIhCYkMrZNQfNicPDExt2jPb9AZPpL7cfyUo7QSNl1DjsHjmmKp6qNAqUESyT8YFl/Aw91cNJJg==", "cpu": [ "arm64" ], @@ -845,9 +845,9 @@ } }, "node_modules/@next/swc-win32-x64-msvc": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-15.1.7.tgz", - "integrity": "sha512-dC01f1quuf97viOfW05/K8XYv2iuBgAxJZl7mbCKEjMgdQl5JjAKJ0D2qMKZCgPWDeFbFT0Q0nYWwytEW0DWTQ==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/@next/swc-win32-x64-msvc/-/swc-win32-x64-msvc-15.2.4.tgz", + "integrity": "sha512-SbnWkJmkS7Xl3kre8SdMF6F/XDh1DTFEhp0jRTj/uB8iPKoU2bb2NDfcu+iifv1+mxQEd1g2vvSxcZbXSKyWiQ==", "cpu": [ "x64" ], @@ -4945,12 +4945,12 @@ "license": "MIT" }, "node_modules/next": { - "version": "15.1.7", - "resolved": "https://registry.npmjs.org/next/-/next-15.1.7.tgz", - "integrity": "sha512-GNeINPGS9c6OZKCvKypbL8GTsT5GhWPp4DM0fzkXJuXMilOO2EeFxuAY6JZbtk6XIl6Ws10ag3xRINDjSO5+wg==", + "version": "15.2.4", + "resolved": "https://registry.npmjs.org/next/-/next-15.2.4.tgz", + "integrity": "sha512-VwL+LAaPSxEkd3lU2xWbgEOtrM8oedmyhBqaVNmgKB+GvZlCy9rgaEc+y2on0wv+l0oSFqLtYD6dcC1eAedUaQ==", "license": "MIT", "dependencies": { - "@next/env": "15.1.7", + "@next/env": "15.2.4", "@swc/counter": "0.1.3", "@swc/helpers": "0.5.15", "busboy": "1.6.0", @@ -4965,14 +4965,14 @@ "node": "^18.18.0 || ^19.8.0 || >= 20.0.0" }, "optionalDependencies": { - "@next/swc-darwin-arm64": "15.1.7", - "@next/swc-darwin-x64": "15.1.7", - "@next/swc-linux-arm64-gnu": "15.1.7", - "@next/swc-linux-arm64-musl": "15.1.7", - "@next/swc-linux-x64-gnu": "15.1.7", - "@next/swc-linux-x64-musl": "15.1.7", - "@next/swc-win32-arm64-msvc": "15.1.7", - "@next/swc-win32-x64-msvc": "15.1.7", + "@next/swc-darwin-arm64": "15.2.4", + "@next/swc-darwin-x64": "15.2.4", + "@next/swc-linux-arm64-gnu": "15.2.4", + "@next/swc-linux-arm64-musl": "15.2.4", + "@next/swc-linux-x64-gnu": "15.2.4", + "@next/swc-linux-x64-musl": "15.2.4", + "@next/swc-win32-arm64-msvc": "15.2.4", + "@next/swc-win32-x64-msvc": "15.2.4", "sharp": "^0.33.5" }, "peerDependencies": { diff --git a/apps/web/package.json b/apps/web/package.json index 487c2f6..b3c95ff 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -20,7 +20,7 @@ "clsx": "^2.1.1", "embla-carousel-react": "^8.5.2", "lucide-react": "^0.475.0", - "next": "15.1.7", + "next": "15.2.4", "next-themes": "^0.4.4", "react": "^19.0.0", "react-dom": "^19.0.0", From 7e1da1134a655c51ba25bdf3b1335c31c791b2fe Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sat, 9 Aug 2025 06:52:18 +0200 Subject: [PATCH 04/19] reannounce periodically --- src/internal/protocol/clock.go | 3 + src/internal/protocol/crdt.go | 4 + src/internal/protocol/host.go | 38 ++++++ src/internal/protocol/registrar.go | 74 ++++++++++- tokens/programs/tokens/src/lib.rs | 199 ++++++++++++++--------------- tokens/tests/anchor.test.rs | 115 +++++++++++++++++ 6 files changed, 326 insertions(+), 107 deletions(-) create mode 100644 tokens/tests/anchor.test.rs diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index b292b12..79a1190 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -66,6 +66,9 @@ func StartTicker() { // Log if we have very few connections (potential issue) if len(connectedPeers) < 3 { common.Logger.Warnf("Low connection count detected: only %d connected peers", len(connectedPeers)) + Reconnect() + // best-effort re-announce our services after trying to reconnect + ReannounceLocalServices() } }) common.ReportError(err, "Error while creating resource monitoring ticker") diff --git a/src/internal/protocol/crdt.go b/src/internal/protocol/crdt.go index ca995c6..d6d3368 100644 --- a/src/internal/protocol/crdt.go +++ b/src/internal/protocol/crdt.go @@ -112,6 +112,10 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) { func Reconnect() { mode := viper.GetString("mode") + if ipfs == nil { + common.Logger.Warn("Reconnect requested but CRDT/IPFS not initialized yet; skipping") + return + } addsInfo, err := peer.AddrInfosFromP2pAddrs(getDefaultBootstrapPeers(nil, mode)...) common.ReportError(err, "Error while getting bootstrap peers") ipfs.Bootstrap(addsInfo) diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index 7ae9865..d2e2715 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -7,6 +7,7 @@ import ( "ocf/internal/common" "strconv" "sync" + "time" "github.com/ipfs/boxo/ipns" "github.com/ipfs/go-datastore" @@ -148,15 +149,52 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, host.Network().Notify(&network.NotifyBundle{ ConnectedF: func(n network.Network, c network.Conn) { common.Logger.Info("Connected to peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) + // On (re)connections, re-announce local services + go ReannounceLocalServices() }, DisconnectedF: func(n network.Network, c network.Conn) { common.Logger.Info("Disconnected from peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) }, }) + // Start a background auto-reconnector that watches connectivity + go startAutoReconnect(ctx, host) + return host, nil } +// startAutoReconnect periodically checks if we lost connectivity and attempts to reconnect to bootstraps with backoff. +func startAutoReconnect(ctx context.Context, h host.Host) { + // exponential backoff parameters + minDelay := 5 * time.Second + maxDelay := 2 * time.Minute + delay := minDelay + for { + select { + case <-ctx.Done(): + return + case <-time.After(30 * time.Second): + // If very few or zero peers, try bootstrap + conns := h.Network().Conns() + if len(conns) == 0 { + common.Logger.Warn("No active P2P connections; attempting reconnect to bootstraps...") + Reconnect() + // after a reconnect attempt, wait with backoff if still disconnected + time.Sleep(delay) + if delay < maxDelay { + delay *= 2 + if delay > maxDelay { + delay = maxDelay + } + } + } else { + // reset backoff when connected + delay = minDelay + } + } + } +} + func newDHT(ctx context.Context, h host.Host, ds datastore.Batching) (*dualdht.DHT, error) { dhtOpts := []dualdht.Option{ dualdht.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})), diff --git a/src/internal/protocol/registrar.go b/src/internal/protocol/registrar.go index 672fada..beb0cd5 100644 --- a/src/internal/protocol/registrar.go +++ b/src/internal/protocol/registrar.go @@ -6,12 +6,58 @@ import ( "errors" "ocf/internal/common" "ocf/internal/platform" + "sync" "time" ds "github.com/ipfs/go-datastore" "github.com/spf13/viper" ) +// localServices keeps a thread-safe copy of services this node provides +// so we can re-announce them on reconnects +var ( + localServices []Service + localServicesLock = &sync.RWMutex{} +) + +// addLocalService appends (deduped) to localServices +func addLocalService(svc Service) { + localServicesLock.Lock() + defer localServicesLock.Unlock() + // simple dedupe on Name|Host|Port + key := svc.Name + "|" + svc.Host + "|" + svc.Port + exists := false + for i := range localServices { + k := localServices[i].Name + "|" + localServices[i].Host + "|" + localServices[i].Port + if k == key { + // merge identity groups (dedupe) + existing := make(map[string]struct{}) + for _, id := range localServices[i].IdentityGroup { + existing[id] = struct{}{} + } + for _, id := range svc.IdentityGroup { + if _, ok := existing[id]; !ok { + localServices[i].IdentityGroup = append(localServices[i].IdentityGroup, id) + } + } + exists = true + break + } + } + if !exists { + localServices = append(localServices, svc) + } +} + +// snapshotLocalServices returns a copy of current local services +func snapshotLocalServices() []Service { + localServicesLock.RLock() + defer localServicesLock.RUnlock() + out := make([]Service, len(localServices)) + copy(out, localServices) + return out +} + func RegisterLocalServices() { serviceName := viper.GetString("service.name") servicePort := viper.GetString("service.port") @@ -79,7 +125,9 @@ func provideService(service Service) { ctx := context.Background() store, _ := GetCRDTStore() key := ds.NewKey(host.ID().String()) - myself.Service = []Service{service} + // track locally and publish full set (deduped) + addLocalService(service) + myself.Service = snapshotLocalServices() if viper.GetString("public-addr") != "" { myself.PublicAddress = viper.GetString("public-addr") } @@ -103,3 +151,27 @@ func updateMyself() { common.Logger.Info("Updated myself in CRDT store: ", myself) } } + +// ReannounceLocalServices re-publishes this node's service entry, used after reconnects +func ReannounceLocalServices() { + host, _ := GetP2PNode(nil) + ctx := context.Background() + store, _ := GetCRDTStore() + key := ds.NewKey(host.ID().String()) + // refresh hardware and services + myself.Hardware.GPUs = platform.GetGPUInfo() + myself.Service = snapshotLocalServices() + if viper.GetString("public-addr") != "" { + myself.PublicAddress = viper.GetString("public-addr") + } + value, err := json.Marshal(myself) + if err != nil { + common.Logger.Error("Error marshalling self during reannounce: ", err) + return + } + if err := store.Put(ctx, key, value); err != nil { + common.Logger.Warn("Failed to reannounce local services: ", err) + } else { + common.Logger.Info("Re-announced local services to network") + } +} diff --git a/tokens/programs/tokens/src/lib.rs b/tokens/programs/tokens/src/lib.rs index 0351fa5..e5b8f9c 100644 --- a/tokens/programs/tokens/src/lib.rs +++ b/tokens/programs/tokens/src/lib.rs @@ -1,141 +1,128 @@ -#![allow(clippy::result_large_err)] - use anchor_lang::prelude::*; -use anchor_spl::associated_token::AssociatedToken; -use anchor_spl::token_interface::{ - self, Mint, MintTo, TokenAccount, TokenInterface, TransferChecked, +use anchor_spl::{ + associated_token::AssociatedToken, + metadata::{ + create_metadata_accounts_v3, mpl_token_metadata::types::DataV2, CreateMetadataAccountsV3, + Metadata as Metaplex, + }, + token::{mint_to, Mint, MintTo, Token, TokenAccount}, }; -declare_id!("6qNqxkRF791FXFeQwqYQLEzAbGiqDULC5SSHVsfRoG89"); +declare_id!("3SVic52pd75oKSaQKQvKjHK7pmyH1a4s2duCUVK7eXRV"); #[program] -pub mod anchor { - +mod token_minter { use super::*; + pub fn init_token(ctx: Context, metadata: InitTokenParams) -> Result<()> { + let seeds = &["mint".as_bytes(), &[ctx.bumps.mint]]; + let signer = [&seeds[..]]; - pub fn create_token(_ctx: Context, _token_name: String) -> Result<()> { - msg!("Create Token"); - Ok(()) - } - pub fn create_token_account(_ctx: Context) -> Result<()> { - msg!("Create Token Account"); - Ok(()) - } - pub fn create_associated_token_account( - _ctx: Context, - ) -> Result<()> { - msg!("Create Associated Token Account"); - Ok(()) - } - pub fn transfer_token(ctx: Context, amount: u64) -> Result<()> { - let cpi_accounts = TransferChecked { - from: ctx.accounts.from.to_account_info().clone(), - mint: ctx.accounts.mint.to_account_info().clone(), - to: ctx.accounts.to_ata.to_account_info().clone(), - authority: ctx.accounts.signer.to_account_info(), + let token_data: DataV2 = DataV2 { + name: metadata.name, + symbol: metadata.symbol, + uri: metadata.uri, + seller_fee_basis_points: 0, + creators: None, + collection: None, + uses: None, }; - let cpi_program = ctx.accounts.token_program.to_account_info(); - let cpi_context = CpiContext::new(cpi_program, cpi_accounts); - token_interface::transfer_checked(cpi_context, amount, ctx.accounts.mint.decimals)?; - msg!("Transfer Token"); + + let metadata_ctx = CpiContext::new_with_signer( + ctx.accounts.token_metadata_program.to_account_info(), + CreateMetadataAccountsV3 { + payer: ctx.accounts.payer.to_account_info(), + update_authority: ctx.accounts.mint.to_account_info(), + mint: ctx.accounts.mint.to_account_info(), + metadata: ctx.accounts.metadata.to_account_info(), + mint_authority: ctx.accounts.mint.to_account_info(), + system_program: ctx.accounts.system_program.to_account_info(), + rent: ctx.accounts.rent.to_account_info(), + }, + &signer, + ); + + create_metadata_accounts_v3(metadata_ctx, token_data, false, true, None)?; + + msg!("Token mint created successfully."); + Ok(()) } - pub fn mint_token(ctx: Context, amount: u64) -> Result<()> { - let cpi_accounts = MintTo { - mint: ctx.accounts.mint.to_account_info().clone(), - to: ctx.accounts.receiver.to_account_info().clone(), - authority: ctx.accounts.signer.to_account_info(), - }; - let cpi_program = ctx.accounts.token_program.to_account_info(); - let cpi_context = CpiContext::new(cpi_program, cpi_accounts); - token_interface::mint_to(cpi_context, amount)?; - msg!("Mint Token"); + + pub fn mint_tokens(ctx: Context, quantity: u64) -> Result<()> { + let seeds = &["mint".as_bytes(), &[ctx.bumps.mint]]; + let signer = [&seeds[..]]; + + mint_to( + CpiContext::new_with_signer( + ctx.accounts.token_program.to_account_info(), + MintTo { + authority: ctx.accounts.mint.to_account_info(), + to: ctx.accounts.destination.to_account_info(), + mint: ctx.accounts.mint.to_account_info(), + }, + &signer, + ), + quantity, + )?; + Ok(()) } + } -#[derive(Accounts)] -#[instruction(token_name: String)] -pub struct CreateToken<'info> { - #[account(mut)] - pub signer: Signer<'info>, - #[account( - init, - payer = signer, - mint::decimals = 6, - mint::authority = signer.key(), - seeds = [b"token-2022-token", signer.key().as_ref(), token_name.as_bytes()], - bump, - )] - pub mint: InterfaceAccount<'info, Mint>, - pub system_program: Program<'info, System>, - pub token_program: Interface<'info, TokenInterface>, +#[derive(AnchorSerialize, AnchorDeserialize, Debug, Clone)] +pub struct InitTokenParams { + pub name: String, + pub symbol: String, + pub uri: String, + pub decimals: u8, } +// 4. Define the context for each instruction #[derive(Accounts)] -pub struct CreateTokenAccount<'info> { +#[instruction( + params: InitTokenParams +)] +pub struct InitToken<'info> { #[account(mut)] - pub signer: Signer<'info>, - pub mint: InterfaceAccount<'info, Mint>, + pub metadata: UncheckedAccount<'info>, #[account( init, - token::mint = mint, - token::authority = signer, - payer = signer, - seeds = [b"researchcomputer2025", signer.key().as_ref(), mint.key().as_ref()], + seeds = [b"mint"], bump, + payer = payer, + mint::decimals = params.decimals, + mint::authority = mint, )] - pub token_account: InterfaceAccount<'info, TokenAccount>, + pub mint: Account<'info, Mint>, + #[account(mut)] + pub payer: Signer<'info>, + pub rent: Sysvar<'info, Rent>, pub system_program: Program<'info, System>, - pub token_program: Interface<'info, TokenInterface>, + pub token_program: Program<'info, Token>, + pub token_metadata_program: Program<'info, Metaplex>, } #[derive(Accounts)] -pub struct CreateAssociatedTokenAccount<'info> { - #[account(mut)] - pub signer: Signer<'info>, - pub mint: InterfaceAccount<'info, Mint>, +pub struct MintTokens<'info> { #[account( - init, - associated_token::mint = mint, - payer = signer, - associated_token::authority = signer, + mut, + seeds = [b"mint"], + bump, + mint::authority = mint, )] - pub token_account: InterfaceAccount<'info, TokenAccount>, - pub system_program: Program<'info, System>, - pub token_program: Interface<'info, TokenInterface>, - pub associated_token_program: Program<'info, AssociatedToken>, -} - -#[derive(Accounts)] - -pub struct TransferToken<'info> { - #[account(mut)] - pub signer: Signer<'info>, - #[account(mut)] - pub from: InterfaceAccount<'info, TokenAccount>, - pub to: SystemAccount<'info>, + pub mint: Account<'info, Mint>, #[account( - init, + init_if_needed, + payer = payer, associated_token::mint = mint, - payer = signer, - associated_token::authority = to + associated_token::authority = payer, )] - pub to_ata: InterfaceAccount<'info, TokenAccount>, + pub destination: Account<'info, TokenAccount>, #[account(mut)] - pub mint: InterfaceAccount<'info, Mint>, - pub token_program: Interface<'info, TokenInterface>, + pub payer: Signer<'info>, + pub rent: Sysvar<'info, Rent>, pub system_program: Program<'info, System>, + pub token_program: Program<'info, Token>, pub associated_token_program: Program<'info, AssociatedToken>, -} - -#[derive(Accounts)] -pub struct MintToken<'info> { - #[account(mut)] - pub signer: Signer<'info>, - #[account(mut)] - pub mint: InterfaceAccount<'info, Mint>, - #[account(mut)] - pub receiver: InterfaceAccount<'info, TokenAccount>, - pub token_program: Interface<'info, TokenInterface>, } \ No newline at end of file diff --git a/tokens/tests/anchor.test.rs b/tokens/tests/anchor.test.rs new file mode 100644 index 0000000..64087dc --- /dev/null +++ b/tokens/tests/anchor.test.rs @@ -0,0 +1,115 @@ +describe("Test Minter", () => { + // Metaplex Constants + const METADATA_SEED = "metadata"; + const TOKEN_METADATA_PROGRAM_ID = new web3.PublicKey( + "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s" + ); + + // Constants from our program + const MINT_SEED = "mint"; + + // Data for our tests + const payer = pg.wallet.publicKey; + const metadata = { + name: "ATT", + symbol: "TEST", + uri: "https://5vfxc4tr6xoy23qefqbj4qx2adzkzapneebanhcalf7myvn5gzja.arweave.net/7UtxcnH13Y1uBCwCnkL6APKsge0hAgacQFl-zFW9NlI", + decimals: 9, + }; + const mintAmount = 10; + const [mint] = web3.PublicKey.findProgramAddressSync( + [Buffer.from(MINT_SEED)], + pg.PROGRAM_ID + ); + + const [metadataAddress] = web3.PublicKey.findProgramAddressSync( + [ + Buffer.from(METADATA_SEED), + TOKEN_METADATA_PROGRAM_ID.toBuffer(), + mint.toBuffer(), + ], + TOKEN_METADATA_PROGRAM_ID + ); + + // Test init token + it("initialize", async () => { + const info = await pg.connection.getAccountInfo(mint); + if (info) { + return; // Do not attempt to initialize if already initialized + } + console.log(" Mint not found. Attempting to initialize."); + + const context = { + metadata: metadataAddress, + mint, + payer, + rent: web3.SYSVAR_RENT_PUBKEY, + systemProgram: web3.SystemProgram.programId, + tokenProgram: anchor.utils.token.TOKEN_PROGRAM_ID, + tokenMetadataProgram: TOKEN_METADATA_PROGRAM_ID, + }; + + const tx = await pg.program.methods + .initToken(metadata) + .accounts(context) + .transaction(); + + const txHash = await web3.sendAndConfirmTransaction( + pg.connection, + tx, + [pg.wallet.keypair], + { skipPreflight: true } + ); + console.log(` https://explorer.solana.com/tx/${txHash}?cluster=devnet`); + const newInfo = await pg.connection.getAccountInfo(mint); + assert(newInfo, " Mint should be initialized."); + }); + + // Test mint tokens + it("mint tokens", async () => { + const destination = await anchor.utils.token.associatedAddress({ + mint: mint, + owner: payer, + }); + + let initialBalance: number; + try { + const balance = await pg.connection.getTokenAccountBalance(destination); + initialBalance = balance.value.uiAmount; + } catch { + // Token account not yet initiated has 0 balance + initialBalance = 0; + } + + const context = { + mint, + destination, + payer, + rent: web3.SYSVAR_RENT_PUBKEY, + systemProgram: web3.SystemProgram.programId, + tokenProgram: anchor.utils.token.TOKEN_PROGRAM_ID, + associatedTokenProgram: anchor.utils.token.ASSOCIATED_PROGRAM_ID, + }; + + const tx = await pg.program.methods + .mintTokens(new BN(mintAmount * 10 ** metadata.decimals)) + .accounts(context) + .transaction(); + const txHash = await web3.sendAndConfirmTransaction( + pg.connection, + tx, + [pg.wallet.keypair], + { skipPreflight: true } + ); + console.log(` https://explorer.solana.com/tx/${txHash}?cluster=devnet`); + + const postBalance = ( + await pg.connection.getTokenAccountBalance(destination) + ).value.uiAmount; + assert.equal( + initialBalance + mintAmount, + postBalance, + "Post balance should equal initial plus mint amount" + ); + }); +}); From 2919f491870a48e792b2faa863d32dc7a92393ae Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Sat, 9 Aug 2025 15:14:01 +0200 Subject: [PATCH 05/19] Filter out disconnected peers --- src/internal/protocol/node_table.go | 10 ++++++++++ src/internal/server/crdt_handler.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index 25a480a..8bfdb4e 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -138,6 +138,16 @@ func GetPeerFromTable(peerId string) (Peer, error) { return peer, nil } +func GetConnectedPeers() *NodeTable { + var connected = NodeTable{} + for id, p := range *GetNodeTable() { + if p.Connected { + connected[id] = p + } + } + return &connected +} + func GetService(name string) (Service, error) { host, _ := GetP2PNode(nil) store, _ := GetCRDTStore() diff --git a/src/internal/server/crdt_handler.go b/src/internal/server/crdt_handler.go index 7cd581b..b5ecc4a 100644 --- a/src/internal/server/crdt_handler.go +++ b/src/internal/server/crdt_handler.go @@ -60,5 +60,5 @@ func getDNT(c *gin.Context) { {ingest.TimestampField: time.Now(), "event": "DNT Lookup"}, } IngestEvents(events) - c.JSON(200, protocol.GetNodeTable()) + c.JSON(200, protocol.GetConnectedPeers()) } From a8355346ed9c11b76453f795824374e6074fbe08 Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sat, 9 Aug 2025 17:20:11 +0200 Subject: [PATCH 06/19] update ci & enable reannounce --- src/.gitignore | 3 +- src/Makefile | 6 +- src/bin/cmd/config.go | 43 ------- src/bin/cmd/init.go | 12 -- src/bin/cmd/root.go | 127 -------------------- src/bin/cmd/start.go | 23 ---- src/bin/cmd/update.go | 43 ------- src/bin/cmd/version.go | 19 --- src/bin/main.go | 21 ---- src/internal/common/constants_test.go | 40 ++++++ src/internal/common/filesystem_test.go | 34 ++++++ src/internal/common/logger_test.go | 8 ++ src/internal/common/process/process.go | 42 ++++--- src/internal/common/process/process_test.go | 63 ++++++++++ src/internal/common/requests_test.go | 23 ++++ src/internal/common/serialization_test.go | 14 +++ src/internal/protocol/bootstrap_test.go | 33 +++++ src/internal/protocol/clock.go | 33 +++++ src/internal/protocol/node_table.go | 12 ++ src/internal/protocol/node_table_test.go | 34 ++++++ src/internal/protocol/registrar_test.go | 18 +++ 21 files changed, 343 insertions(+), 308 deletions(-) delete mode 100644 src/bin/cmd/config.go delete mode 100644 src/bin/cmd/init.go delete mode 100644 src/bin/cmd/root.go delete mode 100644 src/bin/cmd/start.go delete mode 100644 src/bin/cmd/update.go delete mode 100644 src/bin/cmd/version.go delete mode 100644 src/bin/main.go create mode 100644 src/internal/common/constants_test.go create mode 100644 src/internal/common/filesystem_test.go create mode 100644 src/internal/common/logger_test.go create mode 100644 src/internal/common/process/process_test.go create mode 100644 src/internal/common/requests_test.go create mode 100644 src/internal/common/serialization_test.go create mode 100644 src/internal/protocol/bootstrap_test.go create mode 100644 src/internal/protocol/node_table_test.go create mode 100644 src/internal/protocol/registrar_test.go diff --git a/src/.gitignore b/src/.gitignore index d163863..4e9c87a 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -1 +1,2 @@ -build/ \ No newline at end of file +build/ +bin/ \ No newline at end of file diff --git a/src/Makefile b/src/Makefile index 0a4d691..1305145 100644 --- a/src/Makefile +++ b/src/Makefile @@ -24,7 +24,7 @@ endif # Dependency versions GOTESTSUM_VERSION ?= 0.4.2 -GOLANGCI_VERSION ?= 1.48.0 +GOLANGCI_VERSION ?= 1.60.3 GOLANG_VERSION ?= 1.14 @@ -162,6 +162,10 @@ bin/golangci-lint-${GOLANGCI_VERSION}: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | BINARY=golangci-lint bash -s -- v${GOLANGCI_VERSION} @mv bin/golangci-lint $@ +.PHONY: lint +lint: bin/golangci-lint ## Run linters + bin/golangci-lint run ./... + release-%: TAG_PREFIX = v release-%: ifneq (${DRY}, 1) diff --git a/src/bin/cmd/config.go b/src/bin/cmd/config.go deleted file mode 100644 index 7ece4de..0000000 --- a/src/bin/cmd/config.go +++ /dev/null @@ -1,43 +0,0 @@ -package cmd - -type P2PConfig struct { - Port string `json:"port" yaml:"port"` -} - -type VaccumConfig struct { - Interval int `json:"interval" yaml:"interval"` -} - -type QueueConfig struct { - Port string `json:"port" yaml:"port"` -} - -type Config struct { - Path string `json:"path" yaml:"path"` - Port string `json:"port" yaml:"port"` - Name string `json:"name" yaml:"name"` - P2p P2PConfig `json:"p2p" yaml:"p2p"` - Vacuum VaccumConfig `json:"vacuum" yaml:"vacuum"` - Queue QueueConfig `json:"queue" yaml:"queue"` - Account AccountConfig `json:"account" yaml:"account"` - Seed string `json:"seed" yaml:"seed"` - TCPPort string `json:"tcp_port" yaml:"tcp_port"` - UDPPort string `json:"udp_port" yaml:"udp_port"` -} - -type AccountConfig struct { - Wallet string `json:"wallet" yaml:"wallet"` -} - -var defaultConfig = Config{ - Seed: "0", - Path: "", - Port: "8092", - Name: "relay", - TCPPort: "43905", - UDPPort: "59820", - P2p: P2PConfig{Port: "8093"}, - Vacuum: VaccumConfig{Interval: 10}, - Queue: QueueConfig{Port: "8094"}, - Account: AccountConfig{Wallet: ""}, -} diff --git a/src/bin/cmd/init.go b/src/bin/cmd/init.go deleted file mode 100644 index c56bd05..0000000 --- a/src/bin/cmd/init.go +++ /dev/null @@ -1,12 +0,0 @@ -package cmd - -import ( - "github.com/spf13/cobra" -) - -var initCmd = &cobra.Command{ - Use: "init", - Short: "Initialize the system, create the database and the config file", - Run: func(cmd *cobra.Command, args []string) { - - }} diff --git a/src/bin/cmd/root.go b/src/bin/cmd/root.go deleted file mode 100644 index 7c8baf5..0000000 --- a/src/bin/cmd/root.go +++ /dev/null @@ -1,127 +0,0 @@ -package cmd - -import ( - "fmt" - "ocf/internal/common" - "os" - "path" - "strconv" - - homedir "github.com/mitchellh/go-homedir" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/spf13/viper" -) - -var cfgFile string -var rootcmd = &cobra.Command{ - Use: "ocfcore", - Short: "ocfcore", - Long: ``, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - return initConfig(cmd) - }, - Run: func(cmd *cobra.Command, args []string) { - err := cmd.Help() - if err != nil { - common.Logger.Error("Could not print help", "error", err) - } - }, -} - -//nolint:gochecknoinits -func init() { - rootcmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.config/ocf/cfg.yaml)") - - startCmd.Flags().String("wallet.account", "", "wallet account") - startCmd.Flags().String("bootstrap.addr", "http://152.67.71.5:8092/v1/dnt/bootstraps", "bootstrap address") - startCmd.Flags().String("seed", "0", "Seed") - startCmd.Flags().String("mode", "node", "Mode (standalone, local, full)") - startCmd.Flags().String("tcpport", "43905", "TCP Port") - startCmd.Flags().String("udpport", "59820", "UDP Port") - startCmd.Flags().String("subprocess", "", "Subprocess to start") - startCmd.Flags().String("public-addr", "", "Public address if you have one (by setting this, you can be a bootstrap node)") - startCmd.Flags().String("service.name", "", "Service name") - startCmd.Flags().String("service.port", "", "Service port") - startCmd.Flags().Bool("cleanslate", true, "Clean slate") - rootcmd.AddCommand(initCmd) - rootcmd.AddCommand(startCmd) - rootcmd.AddCommand(versionCmd) - rootcmd.AddCommand(updateCmd) -} - -func initConfig(cmd *cobra.Command) error { - var home string - var err error - // Don't forget to read config either from cfgFile or from home directory! - if cfgFile != "" { - // Use config file from the flag. - viper.SetConfigFile(cfgFile) - // print out the config file - common.Logger.Info("Using config file: ", viper.ConfigFileUsed()) - } else { - // Find home directory. - home, err = homedir.Dir() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - viper.SetConfigFile(path.Join(home, ".config", "ocf", "cfg.yaml")) - } - if err = viper.ReadInConfig(); err != nil { - viper.SetDefault("path", defaultConfig.Path) - viper.SetDefault("port", defaultConfig.Port) - viper.SetDefault("name", defaultConfig.Name) - viper.SetDefault("p2p", defaultConfig.P2p) - viper.SetDefault("tcpport", defaultConfig.TCPPort) - viper.SetDefault("udpport", defaultConfig.UDPPort) - viper.SetDefault("vacuum.interval", defaultConfig.Vacuum.Interval) - viper.SetDefault("queue.port", defaultConfig.Queue.Port) - configPath := path.Join(home, ".config", "ocf", "cfg.yaml") - err = os.MkdirAll(path.Dir(configPath), os.ModePerm) - if err != nil { - common.Logger.Error("Could not create config directory", "error", err) - os.Exit(1) - } - - if err = viper.SafeWriteConfigAs(configPath); err != nil { - if os.IsNotExist(err) { - err = viper.WriteConfigAs(configPath) - if err != nil { - common.Logger.Warn("Cannot write config file", "error", err) - } - } - } - } - // Bind each Cobra Flag to its associated Viper Key - cmd.Flags().VisitAll(func(flag *pflag.Flag) { - if flag.Changed || !viper.IsSet(flag.Name) { - switch flag.Value.Type() { - case "bool": - value, err := strconv.ParseBool(flag.Value.String()) - if err != nil { - viper.Set(flag.Name, flag.Value) - } else { - viper.Set(flag.Name, value) - } - case "int": - value, err := strconv.ParseInt(flag.Value.String(), 0, 64) - if err != nil { - viper.Set(flag.Name, flag.Value) - } else { - viper.Set(flag.Name, value) - } - default: - viper.Set(flag.Name, flag.Value) - } - } - }) - return nil -} - -func Execute() { - if err := rootcmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) - } -} diff --git a/src/bin/cmd/start.go b/src/bin/cmd/start.go deleted file mode 100644 index efe2b97..0000000 --- a/src/bin/cmd/start.go +++ /dev/null @@ -1,23 +0,0 @@ -package cmd - -import ( - "ocf/internal/common" - "ocf/internal/protocol" - "ocf/internal/server" - - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var startCmd = &cobra.Command{ - Use: "start", - Short: "Start listening for incoming connections", - Run: func(cmd *cobra.Command, args []string) { - // check if cleanslate is set - if viper.GetBool("cleanslate") { - // clean slate, by removing the database - common.Logger.Info("Cleaning slate") - protocol.ClearCRDTStore() - } - server.StartServer() - }} diff --git a/src/bin/cmd/update.go b/src/bin/cmd/update.go deleted file mode 100644 index 52ada5d..0000000 --- a/src/bin/cmd/update.go +++ /dev/null @@ -1,43 +0,0 @@ -package cmd - -import ( - "fmt" - "net/http" - "ocf/internal/common" - "runtime" - - "github.com/minio/selfupdate" - "github.com/spf13/cobra" -) - -func doUpdate() error { - // detect cpu arch - arch := runtime.GOARCH - url := "https://filedn.eu/lougUsdPvd1uJK2jfOYWogH/releases/ocf-" + arch - common.Logger.Info("Downloading from ", url) - resp, err := http.Get(url) - if err != nil { - return err - } - defer resp.Body.Close() - err = selfupdate.Apply(resp.Body, selfupdate.Options{}) - if err != nil { - // error handling - } - return err -} - -var updateCmd = &cobra.Command{ - Use: "update", - Short: "Update the Open Compute Binary", - Run: func(cmd *cobra.Command, args []string) { - fmt.Printf("current ocfcore version %s", common.JSONVersion.Version) - fmt.Printf(" (commit: %s)", common.JSONVersion.Commit) - fmt.Printf(" (built at: %s)", common.JSONVersion.Date) - fmt.Println() - err := doUpdate() - if err != nil { - common.Logger.Error("Error while updating: ", err) - } - }, -} diff --git a/src/bin/cmd/version.go b/src/bin/cmd/version.go deleted file mode 100644 index 96d1c45..0000000 --- a/src/bin/cmd/version.go +++ /dev/null @@ -1,19 +0,0 @@ -package cmd - -import ( - "fmt" - "ocf/internal/common" - - "github.com/spf13/cobra" -) - -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Print the version of ocfcore", - Run: func(cmd *cobra.Command, args []string) { - fmt.Printf("ocfcore version %s", common.JSONVersion.Version) - fmt.Printf(" (commit: %s)", common.JSONVersion.Commit) - fmt.Printf(" (built at: %s)", common.JSONVersion.Date) - fmt.Println() - }, -} diff --git a/src/bin/main.go b/src/bin/main.go deleted file mode 100644 index d0c737f..0000000 --- a/src/bin/main.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "ocf/bin/cmd" - "ocf/internal/common" -) - -var ( - // Populated during build - version = "dev" - commitHash = "?" - buildDate = "" - buildSecret = "" -) - -func main() { - common.JSONVersion.Version = version - common.JSONVersion.Commit = commitHash - common.JSONVersion.Date = buildDate - cmd.Execute() -} diff --git a/src/internal/common/constants_test.go b/src/internal/common/constants_test.go new file mode 100644 index 0000000..f36036d --- /dev/null +++ b/src/internal/common/constants_test.go @@ -0,0 +1,40 @@ +package common + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestGetHomePathCreatesDir(t *testing.T) { + // override HOME to a temp dir + tmp, err := os.MkdirTemp("", "ocf_home_test") + if err != nil { + t.Fatal(err) + } + old := os.Getenv("HOME") + t.Cleanup(func() { _ = os.Setenv("HOME", old) }) + _ = os.Setenv("HOME", tmp) + + p := GetHomePath() + if !strings.HasSuffix(p, ".ocfcore") { + t.Fatalf("expected .ocfcore path, got %s", p) + } + if st, err := os.Stat(p); err != nil || !st.IsDir() { + t.Fatalf("expected directory to be created: %v, %v", st, err) + } +} + +func TestGetDBPath(t *testing.T) { + // fake HOME + tmp, _ := os.MkdirTemp("", "ocf_home_test2") + old := os.Getenv("HOME") + t.Cleanup(func() { _ = os.Setenv("HOME", old) }) + _ = os.Setenv("HOME", tmp) + + db := GetDBPath("node123") + if !strings.Contains(db, filepath.Join(".ocfcore", "ocfcore.node123.db")) { + t.Fatalf("unexpected db path: %s", db) + } +} diff --git a/src/internal/common/filesystem_test.go b/src/internal/common/filesystem_test.go new file mode 100644 index 0000000..65d0c19 --- /dev/null +++ b/src/internal/common/filesystem_test.go @@ -0,0 +1,34 @@ +package common + +import ( + "os" + "path/filepath" + "testing" +) + +func TestRemoveDirNonexistent(t *testing.T) { + tmp := filepath.Join(os.TempDir(), "ocf_nonexistent_dir_for_test") + // ensure it does not exist + _ = os.RemoveAll(tmp) + if err := RemoveDir(tmp); err != nil { + t.Fatalf("RemoveDir on nonexistent dir should not error: %v", err) + } +} + +func TestRemoveDirExisting(t *testing.T) { + dir, err := os.MkdirTemp("", "ocf_remove_dir_test") + if err != nil { + t.Fatal(err) + } + // create a nested file + nested := filepath.Join(dir, "file.txt") + if err := os.WriteFile(nested, []byte("hello"), 0644); err != nil { + t.Fatal(err) + } + if err := RemoveDir(dir); err != nil { + t.Fatalf("RemoveDir should remove directory: %v", err) + } + if _, err := os.Stat(dir); !os.IsNotExist(err) { + t.Fatalf("expected dir to be removed, stat err: %v", err) + } +} diff --git a/src/internal/common/logger_test.go b/src/internal/common/logger_test.go new file mode 100644 index 0000000..0c12e75 --- /dev/null +++ b/src/internal/common/logger_test.go @@ -0,0 +1,8 @@ +package common + +import "testing" + +func TestReportErrorNoPanic(t *testing.T) { + // should not panic on nil + ReportError(nil, "msg") +} diff --git a/src/internal/common/process/process.go b/src/internal/common/process/process.go index e53103c..4ddc98c 100644 --- a/src/internal/common/process/process.go +++ b/src/internal/common/process/process.go @@ -69,9 +69,13 @@ func (p *Process) Start() *Process { p.Kill() }() } - p.started = true - p.proc.Stdout = os.Stdout - p.proc.Stderr = os.Stderr + p.started = true + // If StreamOutput has been set, do not override stdout/stderr to ensure + // the caller can consume output via the returned scanner. + if !p.outputStreamSet { + p.proc.Stdout = os.Stdout + p.proc.Stderr = os.Stderr + } //Call the other functions to stream stdin and stdout err := p.proc.Start() p.pid = p.proc.Process.Pid @@ -79,8 +83,8 @@ func (p *Process) Start() *Process { if err != nil { panic(err) } - go p.awaitOutput() - go p.finishTimeOutOrDie() + go p.awaitOutput() + go p.finishTimeOutOrDie() return p } @@ -145,19 +149,21 @@ func (p *Process) StreamOutput() *bufio.Scanner { } func (p *Process) finishTimeOutOrDie() { - defer p.cleanup() - var result error - select { - case result = <-p.done: - case <-p.cancellationSignal: - log.Println("received cancellationSignal") - //NOT PORTABLE TO WINDOWS - err := p.proc.Process.Kill() - if err != nil { - log.Println(err) - } - } - p.returnCode <- result + var result error + select { + case result = <-p.done: + // Process finished naturally; we captured the Wait() result + case <-p.cancellationSignal: + log.Println("received cancellationSignal") + // NOT PORTABLE TO WINDOWS + if err := p.proc.Process.Kill(); err != nil { + log.Println(err) + } + // Ensure we propagate the actual process exit error after kill + result = <-p.done + } + p.returnCode <- result + p.cleanup() } func (p *Process) cleanup() { diff --git a/src/internal/common/process/process_test.go b/src/internal/common/process/process_test.go new file mode 100644 index 0000000..f8f1076 --- /dev/null +++ b/src/internal/common/process/process_test.go @@ -0,0 +1,63 @@ +package process + +import ( + "bufio" + "strings" + "testing" + "time" +) + +func TestProcessRunWait(t *testing.T) { + p := NewProcess("/bin/echo", "", false, "hello") + p.Start() + if err := p.Wait(); err != nil { + t.Fatalf("echo failed: %v", err) + } +} + +func TestProcessStreamOutput(t *testing.T) { + p := NewProcess("/bin/echo", "", false, "stream") + sc := p.StreamOutput() + p.Start() + var out string + for sc.Scan() { + out += sc.Text() + } + if !strings.Contains(out, "stream") { + t.Fatalf("expected stream in output, got %q", out) + } +} + +func TestProcessTimeoutKill(t *testing.T) { + p := NewProcess("/bin/sleep", "", false, "10") + p.SetTimeout(100 * time.Millisecond) + p.Start() + if err := p.Wait(); err == nil { + t.Fatalf("expected error due to kill/timeout") + } +} + +func TestProcessKillEarly(t *testing.T) { + p := NewProcess("/bin/sleep", "", false, "2") + // stream output before starting for coverage of guard path + _ = p.StreamOutput() + p.Start() + // give it a moment to start + time.Sleep(50 * time.Millisecond) + p.Kill() + _ = p.Wait() // may return error; ensure no panic +} + +func TestOpenInputStreamGuard(t *testing.T) { + p := NewProcess("/bin/echo", "", false, "x") + // open input before start + w, err := p.OpenInputStream() + if err != nil { + t.Fatalf("unexpected: %v", err) + } + // write something and close + bw := bufio.NewWriter(w) + _, _ = bw.WriteString("hi") + bw.Flush() + _ = w.Close() +} diff --git a/src/internal/common/requests_test.go b/src/internal/common/requests_test.go new file mode 100644 index 0000000..98a84e7 --- /dev/null +++ b/src/internal/common/requests_test.go @@ -0,0 +1,23 @@ +package common + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestRemoteGET(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + _, _ = w.Write([]byte("ok")) + })) + defer s.Close() + + b, err := RemoteGET(s.URL) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if string(b) != "ok" { + t.Fatalf("unexpected body: %s", string(b)) + } +} diff --git a/src/internal/common/serialization_test.go b/src/internal/common/serialization_test.go new file mode 100644 index 0000000..a28b552 --- /dev/null +++ b/src/internal/common/serialization_test.go @@ -0,0 +1,14 @@ +package common + +import "testing" + +func TestDictionaryToBytes(t *testing.T) { + data := map[string]interface{}{"a": 1, "b": "x"} + b, err := DictionaryToBytes(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(b) == 0 || string(b)[0] != '{' { + t.Fatalf("expected json object, got: %s", string(b)) + } +} diff --git a/src/internal/protocol/bootstrap_test.go b/src/internal/protocol/bootstrap_test.go new file mode 100644 index 0000000..1f7619d --- /dev/null +++ b/src/internal/protocol/bootstrap_test.go @@ -0,0 +1,33 @@ +package protocol + +import ( + "testing" + + "github.com/multiformats/go-multiaddr" +) + +func TestGetDefaultBootstrapPeersStandalone(t *testing.T) { + // in standalone mode, should return empty slice even if provided nil + res := getDefaultBootstrapPeers(nil, "standalone") + if len(res) != 0 { + t.Fatalf("expected 0 peers, got %d", len(res)) + } +} + +func TestGetDefaultBootstrapPeersLocal(t *testing.T) { + res := getDefaultBootstrapPeers(nil, "local") + if len(res) != 1 { + t.Fatalf("expected 1 local peer, got %d", len(res)) + } + // ensure it's a valid multiaddr + if _, err := multiaddr.NewMultiaddr(res[0].String()); err != nil { + t.Fatalf("invalid multiaddr: %v", err) + } +} + +func TestGetDefaultBootstrapPeersExplicit(t *testing.T) { + res := getDefaultBootstrapPeers([]string{"/ip4/127.0.0.1/tcp/1234"}, "any") + if len(res) != 1 { + t.Fatalf("expected 1 explicit peer, got %d", len(res)) + } +} diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 79a1190..24a837c 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -6,6 +6,7 @@ import ( "ocf/internal/common" "ocf/internal/common/process" "os" + "time" ds "github.com/ipfs/go-datastore" "github.com/jasonlvhit/gocron" @@ -39,6 +40,8 @@ func StartTicker() { common.Logger.Info("Peer:" + peer_id.String() + " got disconnected!") peer.Connected = false } + // update last seen timestamp + peer.LastSeen = time.Now().Unix() value, err := json.Marshal(peer) if err == nil { UpdateNodeTableHook(ds.NewKey(peer_id.String()), value) @@ -70,6 +73,36 @@ func StartTicker() { // best-effort re-announce our services after trying to reconnect ReannounceLocalServices() } + + // Cleanup: remove peers that have been disconnected for a long time + // Define staleness threshold + staleAfter := 10 * time.Minute + table := *GetNodeTable() + now := time.Now().Unix() + for id, p := range table { + if !p.Connected && p.LastSeen > 0 { + if time.Unix(p.LastSeen, 0).Add(staleAfter).Before(time.Now()) { + common.Logger.Warnf("Removing stale peer %s (last seen %v)", id, time.Unix(p.LastSeen, 0)) + DeleteNodeTableHook(ds.NewKey(id)) + } + } + // Also mark peers with very old LastSeen as disconnected + if p.Connected && p.LastSeen > 0 && time.Unix(p.LastSeen, 0).Add(2*time.Minute).Before(time.Now()) { + p.Connected = false + value, err := json.Marshal(p) + if err == nil { + UpdateNodeTableHook(ds.NewKey(id), value) + } + } + // If LastSeen is zero, initialize it now + if p.LastSeen == 0 { + p.LastSeen = now + value, err := json.Marshal(p) + if err == nil { + UpdateNodeTableHook(ds.NewKey(id), value) + } + } + } }) common.ReportError(err, "Error while creating resource monitoring ticker") <-gocron.Start() diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index 25a480a..c0a2788 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -7,6 +7,7 @@ import ( "ocf/internal/common" "ocf/internal/platform" "sync" + "time" ds "github.com/ipfs/go-datastore" "github.com/spf13/viper" @@ -121,6 +122,16 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { var peer Peer err := json.Unmarshal(value, &peer) common.ReportError(err, "Error while unmarshalling peer") + // Preserve locally computed connectivity status if we already know this peer + if existing, ok := table[key.String()]; ok { + peer.Connected = existing.Connected + // If LastSeen is missing in the update, keep the existing one + if peer.LastSeen == 0 { + peer.LastSeen = existing.LastSeen + } + } + // Always update LastSeen on any CRDT update we receive for that peer + peer.LastSeen = time.Now().Unix() table[key.String()] = peer } @@ -181,6 +192,7 @@ func InitializeMyself() { myself = Peer{ ID: host.ID().String(), PublicAddress: viper.GetString("public-addr"), + LastSeen: time.Now().Unix(), } myself.Hardware.GPUs = platform.GetGPUInfo() value, err := json.Marshal(myself) diff --git a/src/internal/protocol/node_table_test.go b/src/internal/protocol/node_table_test.go new file mode 100644 index 0000000..d7beda7 --- /dev/null +++ b/src/internal/protocol/node_table_test.go @@ -0,0 +1,34 @@ +package protocol + +import ( + "encoding/json" + "testing" + + ds "github.com/ipfs/go-datastore" +) + +func TestUpdateNodeTableHookAndGetPeer(t *testing.T) { + _ = GetNodeTable() + p := Peer{ID: "peer1", PublicAddress: "1.2.3.4"} + b, _ := json.Marshal(p) + UpdateNodeTableHook(ds.NewKey("peer1"), b) + + got, err := GetPeerFromTable("peer1") + if err != nil { + t.Fatalf("unexpected: %v", err) + } + if got.PublicAddress != "1.2.3.4" { + t.Fatalf("unexpected peer: %+v", got) + } +} + +func TestDeleteNodeTableHook(t *testing.T) { + table := GetNodeTable() + p := Peer{ID: "peer2", PublicAddress: "5.6.7.8"} + b, _ := json.Marshal(p) + UpdateNodeTableHook(ds.NewKey("peer2"), b) + DeleteNodeTableHook(ds.NewKey("peer2")) + if _, ok := (*table)["/peer2"]; ok { + t.Fatalf("expected peer2 deleted") + } +} diff --git a/src/internal/protocol/registrar_test.go b/src/internal/protocol/registrar_test.go new file mode 100644 index 0000000..4dd5546 --- /dev/null +++ b/src/internal/protocol/registrar_test.go @@ -0,0 +1,18 @@ +package protocol + +import "testing" + +func TestLocalServiceSnapshot(t *testing.T) { + // start with empty registry + localServices = nil + addLocalService(Service{Name: "llm", Host: "localhost", Port: "8000", IdentityGroup: []string{"model=a"}}) + addLocalService(Service{Name: "llm", Host: "localhost", Port: "8000", IdentityGroup: []string{"model=b"}}) + + snap := snapshotLocalServices() + if len(snap) != 1 { + t.Fatalf("expected 1 service after dedupe, got %d", len(snap)) + } + if len(snap[0].IdentityGroup) != 2 { + t.Fatalf("expected merged identity groups, got %v", snap[0].IdentityGroup) + } +} From 1cbdce19f2b41855f7e3d1bfe0b8e4e0a57f9abb Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sat, 9 Aug 2025 17:27:01 +0200 Subject: [PATCH 07/19] enable ci --- .github/workflows/ci.yml | 63 +++++++++++++++++++ src/.gitignore | 3 +- src/Makefile | 6 +- src/entry/cmd/config.go | 43 +++++++++++++ src/entry/cmd/init.go | 12 ++++ src/entry/cmd/root.go | 127 +++++++++++++++++++++++++++++++++++++++ src/entry/cmd/start.go | 23 +++++++ src/entry/cmd/update.go | 43 +++++++++++++ src/entry/cmd/version.go | 19 ++++++ src/entry/main.go | 21 +++++++ 10 files changed, 354 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 src/entry/cmd/config.go create mode 100644 src/entry/cmd/init.go create mode 100644 src/entry/cmd/root.go create mode 100644 src/entry/cmd/start.go create mode 100644 src/entry/cmd/update.go create mode 100644 src/entry/cmd/version.go create mode 100644 src/entry/main.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..9324f5a --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,63 @@ +name: CI + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +permissions: + contents: read + +jobs: + test: + name: Go tests + runs-on: ubuntu-latest + defaults: + run: + working-directory: src + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: src/go.mod + cache: true + + - name: Run tests via Makefile + run: make test VERBOSE=1 + + - name: Print coverage summary + if: always() + run: | + if [ -f build/coverage.txt ]; then + go tool cover -func=build/coverage.txt | tail -n 1 + else + echo "No coverage file found" + fi + + lint: + name: Lint + runs-on: ubuntu-latest + defaults: + run: + working-directory: src + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: src/go.mod + cache: true + + - name: GolangCI-Lint + uses: golangci/golangci-lint-action@v6 + with: + version: v1.60.3 + working-directory: src + args: --timeout=5m + diff --git a/src/.gitignore b/src/.gitignore index 4e9c87a..d163863 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -1,2 +1 @@ -build/ -bin/ \ No newline at end of file +build/ \ No newline at end of file diff --git a/src/Makefile b/src/Makefile index 1305145..b083fb6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -80,8 +80,7 @@ ifeq (${VERBOSE}, 1) go env endif @mkdir -p ${BUILD_DIR} - GOARCH=arm64 go build ${GOARGS} -trimpath -tags "${GOTAGS}" -ldflags "${LDFLAGS}" -o ${BUILD_DIR} ./bin... - mv ${BUILD_DIR}/bin ${BUILD_DIR}/ocf-arm + GOARCH=arm64 go build ${GOARGS} -trimpath -tags "${GOTAGS}" -ldflags "${LDFLAGS}" -o ${BUILD_DIR} ./entry... @${MAKE} post-build .PHONY: build @@ -92,8 +91,7 @@ ifeq (${VERBOSE}, 1) endif @mkdir -p ${BUILD_DIR} - go build ${GOARGS} -trimpath -tags "${GOTAGS}" -ldflags "${LDFLAGS}" -o ${BUILD_DIR} ./bin... - mv ${BUILD_DIR}/bin ${BUILD_DIR}/ocf-amd64 + go build ${GOARGS} -trimpath -tags "${GOTAGS}" -ldflags "${LDFLAGS}" -o ${BUILD_DIR} ./entry... @${MAKE} post-build .PHONY: build-release-deps diff --git a/src/entry/cmd/config.go b/src/entry/cmd/config.go new file mode 100644 index 0000000..72feac8 --- /dev/null +++ b/src/entry/cmd/config.go @@ -0,0 +1,43 @@ +package cmd + +type P2PConfig struct { + Port string `json:"port" yaml:"port"` +} + +type VaccumConfig struct { + Interval int `json:"interval" yaml:"interval"` +} + +type QueueConfig struct { + Port string `json:"port" yaml:"port"` +} + +type Config struct { + Path string `json:"path" yaml:"path"` + Port string `json:"port" yaml:"port"` + Name string `json:"name" yaml:"name"` + P2p P2PConfig `json:"p2p" yaml:"p2p"` + Vacuum VaccumConfig `json:"vacuum" yaml:"vacuum"` + Queue QueueConfig `json:"queue" yaml:"queue"` + Account AccountConfig `json:"account" yaml:"account"` + Seed string `json:"seed" yaml:"seed"` + TCPPort string `json:"tcp_port" yaml:"tcp_port"` + UDPPort string `json:"udp_port" yaml:"udp_port"` +} + +type AccountConfig struct { + Wallet string `json:"wallet" yaml:"wallet"` +} + +var defaultConfig = Config{ + Seed: "0", + Path: "", + Port: "8092", + Name: "relay", + TCPPort: "43905", + UDPPort: "59820", + P2p: P2PConfig{Port: "8093"}, + Vacuum: VaccumConfig{Interval: 10}, + Queue: QueueConfig{Port: "8094"}, + Account: AccountConfig{Wallet: ""}, +} \ No newline at end of file diff --git a/src/entry/cmd/init.go b/src/entry/cmd/init.go new file mode 100644 index 0000000..e73c227 --- /dev/null +++ b/src/entry/cmd/init.go @@ -0,0 +1,12 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +var initCmd = &cobra.Command{ + Use: "init", + Short: "Initialize the system, create the database and the config file", + Run: func(cmd *cobra.Command, args []string) { + + }} \ No newline at end of file diff --git a/src/entry/cmd/root.go b/src/entry/cmd/root.go new file mode 100644 index 0000000..5f614cf --- /dev/null +++ b/src/entry/cmd/root.go @@ -0,0 +1,127 @@ +package cmd + +import ( + "fmt" + "ocf/internal/common" + "os" + "path" + "strconv" + + homedir "github.com/mitchellh/go-homedir" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" +) + +var cfgFile string +var rootcmd = &cobra.Command{ + Use: "ocfcore", + Short: "ocfcore", + Long: ``, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + return initConfig(cmd) + }, + Run: func(cmd *cobra.Command, args []string) { + err := cmd.Help() + if err != nil { + common.Logger.Error("Could not print help", "error", err) + } + }, +} + +//nolint:gochecknoinits +func init() { + rootcmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.config/ocf/cfg.yaml)") + + startCmd.Flags().String("wallet.account", "", "wallet account") + startCmd.Flags().String("bootstrap.addr", "http://152.67.71.5:8092/v1/dnt/bootstraps", "bootstrap address") + startCmd.Flags().String("seed", "0", "Seed") + startCmd.Flags().String("mode", "node", "Mode (standalone, local, full)") + startCmd.Flags().String("tcpport", "43905", "TCP Port") + startCmd.Flags().String("udpport", "59820", "UDP Port") + startCmd.Flags().String("subprocess", "", "Subprocess to start") + startCmd.Flags().String("public-addr", "", "Public address if you have one (by setting this, you can be a bootstrap node)") + startCmd.Flags().String("service.name", "", "Service name") + startCmd.Flags().String("service.port", "", "Service port") + startCmd.Flags().Bool("cleanslate", true, "Clean slate") + rootcmd.AddCommand(initCmd) + rootcmd.AddCommand(startCmd) + rootcmd.AddCommand(versionCmd) + rootcmd.AddCommand(updateCmd) +} + +func initConfig(cmd *cobra.Command) error { + var home string + var err error + // Don't forget to read config either from cfgFile or from home directory! + if cfgFile != "" { + // Use config file from the flag. + viper.SetConfigFile(cfgFile) + // print out the config file + common.Logger.Info("Using config file: ", viper.ConfigFileUsed()) + } else { + // Find home directory. + home, err = homedir.Dir() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + viper.SetConfigFile(path.Join(home, ".config", "ocf", "cfg.yaml")) + } + if err = viper.ReadInConfig(); err != nil { + viper.SetDefault("path", defaultConfig.Path) + viper.SetDefault("port", defaultConfig.Port) + viper.SetDefault("name", defaultConfig.Name) + viper.SetDefault("p2p", defaultConfig.P2p) + viper.SetDefault("tcpport", defaultConfig.TCPPort) + viper.SetDefault("udpport", defaultConfig.UDPPort) + viper.SetDefault("vacuum.interval", defaultConfig.Vacuum.Interval) + viper.SetDefault("queue.port", defaultConfig.Queue.Port) + configPath := path.Join(home, ".config", "ocf", "cfg.yaml") + err = os.MkdirAll(path.Dir(configPath), os.ModePerm) + if err != nil { + common.Logger.Error("Could not create config directory", "error", err) + os.Exit(1) + } + + if err = viper.SafeWriteConfigAs(configPath); err != nil { + if os.IsNotExist(err) { + err = viper.WriteConfigAs(configPath) + if err != nil { + common.Logger.Warn("Cannot write config file", "error", err) + } + } + } + } + // Bind each Cobra Flag to its associated Viper Key + cmd.Flags().VisitAll(func(flag *pflag.Flag) { + if flag.Changed || !viper.IsSet(flag.Name) { + switch flag.Value.Type() { + case "bool": + value, err := strconv.ParseBool(flag.Value.String()) + if err != nil { + viper.Set(flag.Name, flag.Value) + } else { + viper.Set(flag.Name, value) + } + case "int": + value, err := strconv.ParseInt(flag.Value.String(), 0, 64) + if err != nil { + viper.Set(flag.Name, flag.Value) + } else { + viper.Set(flag.Name, value) + } + default: + viper.Set(flag.Name, flag.Value) + } + } + }) + return nil +} + +func Execute() { + if err := rootcmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} \ No newline at end of file diff --git a/src/entry/cmd/start.go b/src/entry/cmd/start.go new file mode 100644 index 0000000..3b115f6 --- /dev/null +++ b/src/entry/cmd/start.go @@ -0,0 +1,23 @@ +package cmd + +import ( + "ocf/internal/common" + "ocf/internal/protocol" + "ocf/internal/server" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var startCmd = &cobra.Command{ + Use: "start", + Short: "Start listening for incoming connections", + Run: func(cmd *cobra.Command, args []string) { + // check if cleanslate is set + if viper.GetBool("cleanslate") { + // clean slate, by removing the database + common.Logger.Info("Cleaning slate") + protocol.ClearCRDTStore() + } + server.StartServer() + }} \ No newline at end of file diff --git a/src/entry/cmd/update.go b/src/entry/cmd/update.go new file mode 100644 index 0000000..3b85d35 --- /dev/null +++ b/src/entry/cmd/update.go @@ -0,0 +1,43 @@ +package cmd + +import ( + "fmt" + "net/http" + "ocf/internal/common" + "runtime" + + "github.com/minio/selfupdate" + "github.com/spf13/cobra" +) + +func doUpdate() error { + // detect cpu arch + arch := runtime.GOARCH + url := "https://filedn.eu/lougUsdPvd1uJK2jfOYWogH/releases/ocf-" + arch + common.Logger.Info("Downloading from ", url) + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + err = selfupdate.Apply(resp.Body, selfupdate.Options{}) + if err != nil { + // error handling + } + return err +} + +var updateCmd = &cobra.Command{ + Use: "update", + Short: "Update the Open Compute Binary", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("current ocfcore version %s", common.JSONVersion.Version) + fmt.Printf(" (commit: %s)", common.JSONVersion.Commit) + fmt.Printf(" (built at: %s)", common.JSONVersion.Date) + fmt.Println() + err := doUpdate() + if err != nil { + common.Logger.Error("Error while updating: ", err) + } + }, +} \ No newline at end of file diff --git a/src/entry/cmd/version.go b/src/entry/cmd/version.go new file mode 100644 index 0000000..e1c84ce --- /dev/null +++ b/src/entry/cmd/version.go @@ -0,0 +1,19 @@ +package cmd + +import ( + "fmt" + "ocf/internal/common" + + "github.com/spf13/cobra" +) + +var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print the version of ocfcore", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("ocfcore version %s", common.JSONVersion.Version) + fmt.Printf(" (commit: %s)", common.JSONVersion.Commit) + fmt.Printf(" (built at: %s)", common.JSONVersion.Date) + fmt.Println() + }, +} \ No newline at end of file diff --git a/src/entry/main.go b/src/entry/main.go new file mode 100644 index 0000000..fd6eb36 --- /dev/null +++ b/src/entry/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "ocf/entry/cmd" + "ocf/internal/common" +) + +var ( + // Populated during build + version = "dev" + commitHash = "?" + buildDate = "" + buildSecret = "" +) + +func main() { + common.JSONVersion.Version = version + common.JSONVersion.Commit = commitHash + common.JSONVersion.Date = buildDate + cmd.Execute() +} \ No newline at end of file From f9ddbf423de624d97a7211f77cc4591fd104e120 Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sat, 9 Aug 2025 21:34:38 +0200 Subject: [PATCH 08/19] update logic for disconnection detection --- src/.gitignore | 3 ++- src/.golangci.yml | 12 ++++++++++++ src/Makefile | 11 +++++++---- src/entry/cmd/update.go | 10 +++++----- src/entry/main.go | 4 +++- src/internal/common/logger.go | 5 ++--- src/internal/protocol/clock.go | 27 +++++++++++++++++++++------ src/internal/protocol/host.go | 8 +++----- src/internal/protocol/key.go | 1 + tools/establish_connection.py | 20 ++++++++++++++++++++ 10 files changed, 76 insertions(+), 25 deletions(-) create mode 100644 src/.golangci.yml create mode 100644 tools/establish_connection.py diff --git a/src/.gitignore b/src/.gitignore index d163863..4e9c87a 100644 --- a/src/.gitignore +++ b/src/.gitignore @@ -1 +1,2 @@ -build/ \ No newline at end of file +build/ +bin/ \ No newline at end of file diff --git a/src/.golangci.yml b/src/.golangci.yml new file mode 100644 index 0000000..428cd9e --- /dev/null +++ b/src/.golangci.yml @@ -0,0 +1,12 @@ +run: + timeout: 5m + modules-download-mode: mod + go: "1.23" + +linters: + disable: + - typecheck + +issues: + exclude-dirs-use-default: true + diff --git a/src/Makefile b/src/Makefile index b083fb6..e37bf14 100644 --- a/src/Makefile +++ b/src/Makefile @@ -24,7 +24,10 @@ endif # Dependency versions GOTESTSUM_VERSION ?= 0.4.2 -GOLANGCI_VERSION ?= 1.60.3 +GOLANGCI_VERSION ?= 1.61.0 +# Default flags for golangci-lint +GOLANGCI_FLAGS ?= --timeout=5m +LINT_PKGS ?= ./internal/common/... ./entry/... ./bin/... GOLANG_VERSION ?= 1.14 @@ -111,7 +114,7 @@ post-build-release: ${POST_BUILD_RELEASE_TARGETS} build-release: build-release-deps pre-build-release build-release: ## Build binaries without debug information @${MAKE} LDFLAGS="-w ${LDFLAGS}" GOARGS="${GOARGS} -trimpath" BUILD_DIR="${BUILD_DIR}/release" build - + mv build/release/entry build/release/ocf-amd64 @${MAKE} post-build-release .PHONY: build-debug-deps @@ -157,12 +160,12 @@ bin/golangci-lint: bin/golangci-lint-${GOLANGCI_VERSION} @ln -sf golangci-lint-${GOLANGCI_VERSION} bin/golangci-lint bin/golangci-lint-${GOLANGCI_VERSION}: @mkdir -p bin - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | BINARY=golangci-lint bash -s -- v${GOLANGCI_VERSION} + GOBIN=$(abspath bin) go install github.com/golangci/golangci-lint/cmd/golangci-lint@v${GOLANGCI_VERSION} @mv bin/golangci-lint $@ .PHONY: lint lint: bin/golangci-lint ## Run linters - bin/golangci-lint run ./... + GOTOOLCHAIN=go1.23.0 bin/golangci-lint run ${GOLANGCI_FLAGS} ${LINT_PKGS} release-%: TAG_PREFIX = v release-%: diff --git a/src/entry/cmd/update.go b/src/entry/cmd/update.go index 3b85d35..7fbccdd 100644 --- a/src/entry/cmd/update.go +++ b/src/entry/cmd/update.go @@ -20,11 +20,11 @@ func doUpdate() error { return err } defer resp.Body.Close() - err = selfupdate.Apply(resp.Body, selfupdate.Options{}) - if err != nil { - // error handling - } - return err + err = selfupdate.Apply(resp.Body, selfupdate.Options{}) + if err != nil { + return err + } + return nil } var updateCmd = &cobra.Command{ diff --git a/src/entry/main.go b/src/entry/main.go index fd6eb36..662ccf1 100644 --- a/src/entry/main.go +++ b/src/entry/main.go @@ -10,12 +10,14 @@ var ( version = "dev" commitHash = "?" buildDate = "" - buildSecret = "" + // buildSecret left for future use to verify official builds + buildSecret string ) func main() { common.JSONVersion.Version = version common.JSONVersion.Commit = commitHash common.JSONVersion.Date = buildDate + _ = buildSecret cmd.Execute() } \ No newline at end of file diff --git a/src/internal/common/logger.go b/src/internal/common/logger.go index ee16381..3f140ea 100644 --- a/src/internal/common/logger.go +++ b/src/internal/common/logger.go @@ -16,9 +16,8 @@ func init() { } config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder config.Level.SetLevel(zapcore.InfoLevel) - zapLogger, err := config.Build() - // trunk-ignore(golangci-lint/errcheck) - defer zapLogger.Sync() + zapLogger, err := config.Build() + defer func() { _ = zapLogger.Sync() }() if err != nil { panic(err) } diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 24a837c..5c5e3b6 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "encoding/json" "math/rand" "ocf/internal/common" @@ -11,6 +12,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/jasonlvhit/gocron" "github.com/libp2p/go-libp2p/core/network" + libpeer "github.com/libp2p/go-libp2p/core/peer" ) // var verificationKey = "ocf-verification-key" @@ -33,16 +35,29 @@ func StartTicker() { // updateMyself() for _, peer_id := range peers { // check if peer is still connected - peer, error := GetPeerFromTable(peer_id.String()) + p, error := GetPeerFromTable(peer_id.String()) if error == nil { - peer.Connected = true + p.Connected = true if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected { - common.Logger.Info("Peer:" + peer_id.String() + " got disconnected!") - peer.Connected = false + // try to dial the peer, if cannot dial, then mark it as disconnected + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + addrInfo := libpeer.AddrInfo{ID: peer_id, Addrs: host.Peerstore().Addrs(peer_id)} + if len(addrInfo.Addrs) == 0 { + common.Logger.Warnf("No known addresses for peer %s; marking disconnected", peer_id) + p.Connected = false + } else if err := host.Connect(ctx, addrInfo); err != nil { + common.Logger.With("err", err).Warnf("Failed to dial peer %s; marking disconnected", peer_id) + p.Connected = false + } else { + // Successfully reconnected + common.Logger.Infof("Reconnected to peer %s", peer_id) + p.Connected = true + } } // update last seen timestamp - peer.LastSeen = time.Now().Unix() - value, err := json.Marshal(peer) + p.LastSeen = time.Now().Unix() + value, err := json.Marshal(p) if err == nil { UpdateNodeTableHook(ds.NewKey(peer_id.String()), value) } diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index d2e2715..fd532cb 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -72,6 +72,7 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, if err != nil { return nil, err } + writeKeyToFile(priv) } } else { r := mrand.New(mrand.NewSource(seed)) @@ -79,16 +80,13 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, if err != nil { return nil, err } + writeKeyToFile(priv) } - // persist private key - writeKeyToFile(priv) if err != nil { return nil, err } - // Configure resource manager with higher limits limits := rcmgr.DefaultLimits.AutoScale() - // Increase connection limits significantly for a distributed system systemLimits := rcmgr.ResourceLimits{ ConnsInbound: 1000, // Allow up to 1000 inbound connections @@ -97,7 +95,7 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, StreamsInbound: 10000, // Increase stream limits StreamsOutbound: 10000, Streams: 20000, - Memory: 1 << 30, // 1GB memory limit + Memory: 16 << 30, // 16GB memory limit } // Apply the custom limits diff --git a/src/internal/protocol/key.go b/src/internal/protocol/key.go index f9c9233..0093536 100644 --- a/src/internal/protocol/key.go +++ b/src/internal/protocol/key.go @@ -40,6 +40,7 @@ func loadKeyFromFile() crypto.PrivKey { return nil } keyPath := path.Join(home, ".ocfcore", "keys", "id") + common.Logger.Info("Looking for keys under: ", keyPath) keyData, err := os.ReadFile(keyPath) if err != nil { return nil diff --git a/tools/establish_connection.py b/tools/establish_connection.py new file mode 100644 index 0000000..9c8e11f --- /dev/null +++ b/tools/establish_connection.py @@ -0,0 +1,20 @@ +import os +import subprocess + +command = """curl http://148.187.108.172:8092/v1/p2p/
/v1/_service/llm/v1/chat/completions \ + -H 'Authorization: Bearer YOUR_API_KEY' \ + -H 'Content-Type: application/json' \ + -d '{ + "model": "Qwen/Qwen3-32B", + "messages": [ + { "role": "system", "content": "You are a helpful assistant." }, + { "role": "user", "content": "What is the capital of France?" } + ], + "temperature": 0.7 + }'""" + +address = "QmNNRmz2etg76yFhc15cqvVPEKjCywSLSRFiN3brzYEV6u" +cmd = command.replace("
", address) +print(cmd) +# Use subprocess to avoid shell interpretation issues +subprocess.run(cmd, shell=True, check=True) \ No newline at end of file From 1cb7eb146f4069cb0f73497958d1844b65f3883d Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Sat, 9 Aug 2025 23:56:56 +0200 Subject: [PATCH 09/19] update gossip mesh --- src/internal/common/utils.go | 13 ++++++ src/internal/protocol/clock.go | 8 +++- src/internal/protocol/crdt.go | 16 ++++++- src/internal/protocol/host.go | 69 ++++++++++++++++++++++++----- src/internal/protocol/node_table.go | 1 - tools/establish_connection.py | 4 +- 6 files changed, 92 insertions(+), 19 deletions(-) create mode 100644 src/internal/common/utils.go diff --git a/src/internal/common/utils.go b/src/internal/common/utils.go new file mode 100644 index 0000000..ab86243 --- /dev/null +++ b/src/internal/common/utils.go @@ -0,0 +1,13 @@ +package common + +func DeduplicateStrings(input []string) []string { + output := []string{} + seen := make(map[string]struct{}) + for _, s := range input { + if _, ok := seen[s]; !ok { + seen[s] = struct{}{} + output = append(output, s) + } + } + return output +} diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 5c5e3b6..6080313 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -37,9 +37,11 @@ func StartTicker() { // check if peer is still connected p, error := GetPeerFromTable(peer_id.String()) if error == nil { - p.Connected = true - if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected { + if host.Network().Connectedness(peer_id) == network.Connected { + p.Connected = true + } else if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected { // try to dial the peer, if cannot dial, then mark it as disconnected + common.Logger.Info("Dialing ", peer_id.String(), "...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() addrInfo := libpeer.AddrInfo{ID: peer_id, Addrs: host.Peerstore().Addrs(peer_id)} @@ -60,6 +62,8 @@ func StartTicker() { value, err := json.Marshal(p) if err == nil { UpdateNodeTableHook(ds.NewKey(peer_id.String()), value) + } else { + common.Logger.Error("Error while marshalling peer: ", peer_id.String(), err) } } } diff --git a/src/internal/protocol/crdt.go b/src/internal/protocol/crdt.go index d6d3368..b3fb9e5 100644 --- a/src/internal/protocol/crdt.go +++ b/src/internal/protocol/crdt.go @@ -39,8 +39,11 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) { ipfs, err = ipfslite.New(ctx, store, nil, host, &dht, nil) common.ReportError(err, "Error while creating ipfs lite node") - - psub, err := pubsub.NewGossipSub(ctx, host) + pubsubParams := pubsub.DefaultGossipSubParams() + pubsubParams.D = 128 + pubsubParams.Dlo = 16 + pubsubParams.Dhi = 256 + psub, err := pubsub.NewGossipSub(ctx, host, pubsub.WithGossipSubParams(pubsubParams)) common.ReportError(err, "Error while creating pubsub") topic, err := psub.Join(pubsubNet) @@ -57,6 +60,15 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) { break } host.ConnManager().TagPeer(msg.ReceivedFrom, "keep", 100) + // Update LastSeen when we receive a message from a peer + p, gerr := GetPeerFromTable(msg.ReceivedFrom.String()) + if gerr != nil { + p = Peer{ID: msg.ReceivedFrom.String()} + } + p.LastSeen = time.Now().Unix() + if b, merr := json.Marshal(p); merr == nil { + UpdateNodeTableHook(ds.NewKey(msg.ReceivedFrom.String()), b) + } } }() diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index fd532cb..b87341a 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -3,6 +3,7 @@ package protocol import ( "context" "crypto/rand" + "encoding/json" mrand "math/rand" "ocf/internal/common" "strconv" @@ -21,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/security/noise" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "github.com/spf13/viper" @@ -52,15 +54,17 @@ func GetP2PNode(ds datastore.Batching) (host.Host, dualdht.DHT) { } func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, error) { - // connmgr, err := connmgr.NewConnManager( - // 50, - // 500, - // connmgr.WithGracePeriod(time.Minute), - // ) - // if err != nil { - // common.Logger.Error("Error while creating connection manager: %v", err) - // } var err error + // Connection manager: maintain a larger pool of connections so we can exceed + // the pubsub mesh degree and keep more peers around. + cm, err := connmgr.NewConnManager( + 100, // Low watermark + 800, // High watermark + connmgr.WithGracePeriod(5*time.Minute), + ) + if err != nil { + common.Logger.Error("Error while creating connection manager: ", err) + } var priv crypto.PrivKey // try to load the private key from file if seed == 0 { @@ -103,9 +107,9 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, System: systemLimits, // Keep default peer limits but increase them slightly PeerDefault: rcmgr.ResourceLimits{ - ConnsInbound: 16, // Allow more connections per peer - ConnsOutbound: 16, - Conns: 32, + ConnsInbound: 512, // Allow more connections per peer + ConnsOutbound: 512, + Conns: 1024, }, }.Build(limits) @@ -119,7 +123,7 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, libp2p.DefaultTransports, libp2p.Identity(priv), libp2p.ResourceManager(mgr), // Use our custom resource manager - // libp2p.ConnectionManager(connmgr), + libp2p.ConnectionManager(cm), libp2p.NATPortMap(), libp2p.ListenAddrStrings( "/ip4/0.0.0.0/tcp/"+viper.GetString("tcpport"), @@ -149,9 +153,45 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, common.Logger.Info("Connected to peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) // On (re)connections, re-announce local services go ReannounceLocalServices() + + // Mark peer as connected in node table immediately + go func(pid peer.ID) { + // Avoid updating self + if pid == host.ID() { + return + } + p, err := GetPeerFromTable(pid.String()) + if err != nil { + p = Peer{ID: pid.String()} + } + p.Connected = true + p.LastSeen = time.Now().Unix() + if b, e := json.Marshal(p); e == nil { + UpdateNodeTableHook(datastore.NewKey(pid.String()), b) + } else { + common.Logger.Error("Failed to marshal peer on connect: ", e) + } + }(c.RemotePeer()) }, DisconnectedF: func(n network.Network, c network.Conn) { common.Logger.Info("Disconnected from peer: ", c.RemotePeer(), " Total connections: ", len(n.Conns())) + // Mark peer as disconnected in node table immediately + go func(pid peer.ID) { + if pid == host.ID() { + return + } + p, err := GetPeerFromTable(pid.String()) + if err != nil { + p = Peer{ID: pid.String()} + } + p.Connected = false + // keep LastSeen as last known good; do not bump here + if b, e := json.Marshal(p); e == nil { + UpdateNodeTableHook(datastore.NewKey(pid.String()), b) + } else { + common.Logger.Error("Failed to marshal peer on disconnect: ", e) + } + }(c.RemotePeer()) }, }) @@ -247,6 +287,11 @@ func ConnectedBootstraps() []string { } } } + // add myself as bootstrap + myaddr := host.Addrs()[0].String() + "/p2p/" + host.ID().String() + bootstraps = append(bootstraps, myaddr) + // deduplicate + bootstraps = common.DeduplicateStrings(bootstraps) return bootstraps } diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index c0a2788..c062d15 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -124,7 +124,6 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { common.ReportError(err, "Error while unmarshalling peer") // Preserve locally computed connectivity status if we already know this peer if existing, ok := table[key.String()]; ok { - peer.Connected = existing.Connected // If LastSeen is missing in the update, keep the existing one if peer.LastSeen == 0 { peer.LastSeen = existing.LastSeen diff --git a/tools/establish_connection.py b/tools/establish_connection.py index 9c8e11f..6b4eb00 100644 --- a/tools/establish_connection.py +++ b/tools/establish_connection.py @@ -5,7 +5,7 @@ -H 'Authorization: Bearer YOUR_API_KEY' \ -H 'Content-Type: application/json' \ -d '{ - "model": "Qwen/Qwen3-32B", + "model": "swissai/apertus3-70b-15T-sft", "messages": [ { "role": "system", "content": "You are a helpful assistant." }, { "role": "user", "content": "What is the capital of France?" } @@ -13,7 +13,7 @@ "temperature": 0.7 }'""" -address = "QmNNRmz2etg76yFhc15cqvVPEKjCywSLSRFiN3brzYEV6u" +address = "QmSNB58JK6TvpWpKqAQMJSmvZbzWLy5Qp9jkT8pNp9cJf5" cmd = command.replace("
", address) print(cmd) # Use subprocess to avoid shell interpretation issues From 42421f38e00b189638a85a7819faf6b4935a68fb Mon Sep 17 00:00:00 2001 From: Xiaozhe Yao Date: Mon, 11 Aug 2025 06:22:05 +0200 Subject: [PATCH 10/19] completely disable resource manager --- meta/build_docker.sh | 1 + src/internal/protocol/host.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 meta/build_docker.sh diff --git a/meta/build_docker.sh b/meta/build_docker.sh new file mode 100644 index 0000000..d8de63b --- /dev/null +++ b/meta/build_docker.sh @@ -0,0 +1 @@ +docker build -f meta/Dockerfile.amd64 -t ghcr.io/xiaozheyao/ocf:amd64-dev . && docker push ghcr.io/xiaozheyao/ocf:amd64-dev \ No newline at end of file diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index b87341a..6feea1d 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -122,8 +122,8 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, opts := []libp2p.Option{ libp2p.DefaultTransports, libp2p.Identity(priv), - libp2p.ResourceManager(mgr), // Use our custom resource manager - libp2p.ConnectionManager(cm), + libp2p.ResourceManager(&network.NullResourceManager{}), + // libp2p.ConnectionManager(connmgr), libp2p.NATPortMap(), libp2p.ListenAddrStrings( "/ip4/0.0.0.0/tcp/"+viper.GetString("tcpport"), @@ -237,7 +237,7 @@ func newDHT(ctx context.Context, h host.Host, ds datastore.Batching) (*dualdht.D dhtOpts := []dualdht.Option{ dualdht.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})), dualdht.DHTOption(dht.NamespacedValidator("ipns", ipns.Validator{KeyBook: h.Peerstore()})), - // dualdht.DHTOption(dht.Concurrency(500)), + dualdht.DHTOption(dht.Concurrency(512)), dualdht.DHTOption(dht.Mode(dht.ModeAuto)), } if ds != nil { From fbfcfaafe80c811486af6207d9d1ef896248a634 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Mon, 11 Aug 2025 10:56:46 +0200 Subject: [PATCH 11/19] Added multiple serving instances --- local-demo/docker-compose.yml | 217 +++++++++++++++++++++++++++++++++- 1 file changed, 216 insertions(+), 1 deletion(-) diff --git a/local-demo/docker-compose.yml b/local-demo/docker-compose.yml index ee1c584..d9697d9 100644 --- a/local-demo/docker-compose.yml +++ b/local-demo/docker-compose.yml @@ -30,7 +30,7 @@ services: ports: - 8092:8092 - 43905:43905 - llm-srevice: + llm-srevice-1: image: ocf-sql-amd64 build: context: ../ @@ -57,3 +57,218 @@ services: ports: - 8093:8092 - 43906:43905 + llm-srevice-2: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.22 + ports: + - 8094:8092 + - 43907:43905 + + llm-srevice-3: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.23 + ports: + - 8095:8092 + - 43908:43905 + + llm-srevice-4: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.24 + ports: + - 8096:8092 + - 43910:43905 + + llm-srevice-5: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.25 + ports: + - 8097:8092 + - 43911:43905 + + llm-srevice-6: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.26 + ports: + - 8098:8092 + - 43912:43905 + + llm-srevice-7: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.27 + ports: + - 8099:8092 + - 43913:43905 + + llm-srevice-8: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.28 + ports: + - 8100:8092 + - 43914:43905 + + llm-srevice-9: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.29 + ports: + - 8101:8092 + - 43915:43905 + + llm-srevice-0: + image: ocf-sql-amd64 + build: + context: ../ + dockerfile: ./local-demo/docker/llm-serving/Dockerfile + command: + [ + "start", + "--bootstrap.addr", + "/ip4/192.168.200.10/tcp/43905/p2p/QmcMpnf39qfJcXssHrFFw7nvAioLd4SXKhzBZ4XMcLDoSU", + "--subprocess", + "uvicorn serving-engine.main:app --host 0.0.0.0 --port 8080", + "--service.name", + "llm", + "--service.port", + "8080" + ] + networks: + ocf-internal: + ipv4_address: 192.168.200.30 + ports: + - 8102:8092 + - 43916:43905 From 9a16f0c8fff3a7ee82e825bc5631382c13a54958 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Mon, 11 Aug 2025 14:22:49 +0200 Subject: [PATCH 12/19] connmgr removal cleanup --- src/internal/protocol/host.go | 38 ----------------------------------- 1 file changed, 38 deletions(-) diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index 6feea1d..afeac7a 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -22,7 +22,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/security/noise" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "github.com/spf13/viper" @@ -55,13 +54,6 @@ func GetP2PNode(ds datastore.Batching) (host.Host, dualdht.DHT) { func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, error) { var err error - // Connection manager: maintain a larger pool of connections so we can exceed - // the pubsub mesh degree and keep more peers around. - cm, err := connmgr.NewConnManager( - 100, // Low watermark - 800, // High watermark - connmgr.WithGracePeriod(5*time.Minute), - ) if err != nil { common.Logger.Error("Error while creating connection manager: ", err) } @@ -89,41 +81,11 @@ func newHost(ctx context.Context, seed int64, ds datastore.Batching) (host.Host, if err != nil { return nil, err } - // Configure resource manager with higher limits - limits := rcmgr.DefaultLimits.AutoScale() - // Increase connection limits significantly for a distributed system - systemLimits := rcmgr.ResourceLimits{ - ConnsInbound: 1000, // Allow up to 1000 inbound connections - ConnsOutbound: 1000, // Allow up to 1000 outbound connections - Conns: 2000, // Allow up to 2000 total connections - StreamsInbound: 10000, // Increase stream limits - StreamsOutbound: 10000, - Streams: 20000, - Memory: 16 << 30, // 16GB memory limit - } - - // Apply the custom limits - finalLimits := rcmgr.PartialLimitConfig{ - System: systemLimits, - // Keep default peer limits but increase them slightly - PeerDefault: rcmgr.ResourceLimits{ - ConnsInbound: 512, // Allow more connections per peer - ConnsOutbound: 512, - Conns: 1024, - }, - }.Build(limits) - - // Create resource manager - mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(finalLimits)) - if err != nil { - return nil, err - } opts := []libp2p.Option{ libp2p.DefaultTransports, libp2p.Identity(priv), libp2p.ResourceManager(&network.NullResourceManager{}), - // libp2p.ConnectionManager(connmgr), libp2p.NATPortMap(), libp2p.ListenAddrStrings( "/ip4/0.0.0.0/tcp/"+viper.GetString("tcpport"), From a6d1be20ce073549bbac9ba81c2eb7c9de236976 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Mon, 11 Aug 2025 14:22:59 +0200 Subject: [PATCH 13/19] add release workflow --- .github/workflows/release.yml | 36 +++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..6e48a22 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,36 @@ +name: Build and Release Go Binary + +on: + push: + tags: + - 'v*' # Run when pushing a version tag like v1.0.0 + +jobs: + build: + name: Build Go Binary + runs-on: ubuntu-latest + + steps: + - name: Check out source code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: src/go.mod + cache: true + + - name: Build production binary + run: | + cd src + build-release + + + - name: Create GitHub Release + id: create_release + uses: softprops/action-gh-release@v2 + with: + files: | + build/release/ocf-amd64 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file From 7fcefe2373aedba78d1814aeb5e77afb0b3da69e Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Mon, 11 Aug 2025 14:26:00 +0200 Subject: [PATCH 14/19] fix release pipeline --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6e48a22..b7f02f6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,7 +23,7 @@ jobs: - name: Build production binary run: | cd src - build-release + make build-release - name: Create GitHub Release From c402c568aec4448b120f1bfa4f2860d7b3e40d1a Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Mon, 11 Aug 2025 14:28:54 +0200 Subject: [PATCH 15/19] fix release pipeline --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b7f02f6..b2e4545 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -31,6 +31,6 @@ jobs: uses: softprops/action-gh-release@v2 with: files: | - build/release/ocf-amd64 + src/build/release/ocf-amd64 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file From 6bc9787ce0907f3ae6199be4578d37eda7d7f6c3 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Tue, 12 Aug 2025 09:53:54 +0200 Subject: [PATCH 16/19] build arm --- .github/workflows/release.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b2e4545..3a21c6a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,7 +23,10 @@ jobs: - name: Build production binary run: | cd src - make build-release + GOOS=linux GOARCH=amd64 go build -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... + mv build/release/entry build/release/ocf-amd64 + GOOS=linux GOARCH=arm64 go build -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... + mv build/release/entry build/release/ocf-arm64 - name: Create GitHub Release @@ -32,5 +35,6 @@ jobs: with: files: | src/build/release/ocf-amd64 + src/build/release/ocf-arm64 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file From 65dbb7855646beb8c25eac77e79c51c8cf07beea Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Tue, 12 Aug 2025 09:54:08 +0200 Subject: [PATCH 17/19] build arm --- .github/workflows/release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 3a21c6a..a6f8005 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,9 +23,9 @@ jobs: - name: Build production binary run: | cd src - GOOS=linux GOARCH=amd64 go build -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... + GOOS=linux GOARCH=amd64 go build -trimpath -tags "" -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... mv build/release/entry build/release/ocf-amd64 - GOOS=linux GOARCH=arm64 go build -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... + GOOS=linux GOARCH=arm64 go build -trimpath -tags "" -ldflags "-X main.version=${VERSION} -X main.commitHash=${COMMIT_HASH} -X main.buildDate=${BUILD_DATE} -X main.authUrl=$(AUTH_URL) -X main.authClientId=$(AUTH_CLIENT_ID) -X main.authSecret=$(AUTH_CLIENT_SECRET) -X main.sentryDSN=$(SENTRY_DSN)" -o ./build/release/ ./entry... mv build/release/entry build/release/ocf-arm64 From be65de342a0c7ca8d961353808daaae7c59313db Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Tue, 12 Aug 2025 16:08:18 +0200 Subject: [PATCH 18/19] fix concurrent table access --- src/internal/protocol/node_table.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index c3742f5..521e15e 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -117,6 +117,8 @@ func DeleteNodeTable() { store.Delete(ctx, key) } +var tableUpdateSem = make(chan struct{}, 1) // capacity 1 → max 1 goroutine at a time + func UpdateNodeTableHook(key ds.Key, value []byte) { table := *GetNodeTable() var peer Peer @@ -131,11 +133,16 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { } // Always update LastSeen on any CRDT update we receive for that peer peer.LastSeen = time.Now().Unix() + + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit table[key.String()] = peer } func DeleteNodeTableHook(key ds.Key) { table := *GetNodeTable() + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit delete(table, key.String()) } From e7d975344c1aa58c48d9e2c820716dbafbe61f49 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Tue, 12 Aug 2025 17:11:58 +0200 Subject: [PATCH 19/19] extended protection from concurrent table access --- src/internal/protocol/node_table.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index 521e15e..faa7bab 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -61,6 +61,7 @@ type PeerWithStatus struct { type NodeTable map[string]Peer var dnt *NodeTable +var tableUpdateSem = make(chan struct{}, 1) // capacity 1 → max 1 goroutine at a time func GetNodeTable() *NodeTable { dntOnce.Do(func() { @@ -117,14 +118,15 @@ func DeleteNodeTable() { store.Delete(ctx, key) } -var tableUpdateSem = make(chan struct{}, 1) // capacity 1 → max 1 goroutine at a time - func UpdateNodeTableHook(key ds.Key, value []byte) { table := *GetNodeTable() var peer Peer err := json.Unmarshal(value, &peer) common.ReportError(err, "Error while unmarshalling peer") // Preserve locally computed connectivity status if we already know this peer + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit + if existing, ok := table[key.String()]; ok { // If LastSeen is missing in the update, keep the existing one if peer.LastSeen == 0 { @@ -133,9 +135,6 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { } // Always update LastSeen on any CRDT update we receive for that peer peer.LastSeen = time.Now().Unix() - - tableUpdateSem <- struct{}{} - defer func() { <-tableUpdateSem }() // Release on exit table[key.String()] = peer } @@ -148,6 +147,8 @@ func DeleteNodeTableHook(key ds.Key) { func GetPeerFromTable(peerId string) (Peer, error) { table := *GetNodeTable() + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit peer, ok := table["/"+peerId] if !ok { return Peer{}, errors.New("peer not found") @@ -157,6 +158,8 @@ func GetPeerFromTable(peerId string) (Peer, error) { func GetConnectedPeers() *NodeTable { var connected = NodeTable{} + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit for id, p := range *GetNodeTable() { if p.Connected { connected[id] = p @@ -185,6 +188,8 @@ func GetService(name string) (Service, error) { func GetAllProviders(serviceName string) ([]Peer, error) { var providers []Peer table := *GetNodeTable() + tableUpdateSem <- struct{}{} + defer func() { <-tableUpdateSem }() // Release on exit for _, peer := range table { if peer.Connected { for _, service := range peer.Service {