From faa31d22cc28aa0eb9c2005f44c94c122f1d6815 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Wed, 6 Aug 2025 13:56:39 +0200 Subject: [PATCH 1/4] save connection status --- src/internal/protocol/clock.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/internal/protocol/clock.go b/src/internal/protocol/clock.go index 0277407..41a8f89 100644 --- a/src/internal/protocol/clock.go +++ b/src/internal/protocol/clock.go @@ -1,6 +1,7 @@ package protocol import ( + "encoding/json" "math/rand" "ocf/internal/common" "ocf/internal/common/process" @@ -29,11 +30,19 @@ func StartTicker() { host, _ := GetP2PNode(nil) peers := host.Peerstore().Peers() // updateMyself() - for _, peer := range peers { + for _, peer_id := range peers { // check if peer is still connected - if peer != host.ID() && host.Network().Connectedness(peer) != network.Connected { - // delete peer from table - DeleteNodeTableHook(ds.NewKey(peer.String())) + peer, error := GetPeerFromTable(peer_id.String()) + if error == nil { + peer.Connected = true + if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected { + common.Logger.Info("Peer:" + peer_id.String() + " got disconnected!") + peer.Connected = false + } + value, err := json.Marshal(peer) + if err == nil { + UpdateNodeTableHook(ds.NewKey(peer_id.String()), value) + } } } if !process.HealthCheck() { From 5ac86e21c4074356aa39021974bcf5e8ddd9c4db Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Wed, 6 Aug 2025 13:57:08 +0200 Subject: [PATCH 2/4] newly added nodes are connected by default --- src/internal/protocol/crdt.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/internal/protocol/crdt.go b/src/internal/protocol/crdt.go index 05f93ea..ca995c6 100644 --- a/src/internal/protocol/crdt.go +++ b/src/internal/protocol/crdt.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "encoding/json" "fmt" "ocf/internal/common" "sync" @@ -79,7 +80,16 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) { opts.RebroadcastInterval = 5 * time.Second opts.PutHook = func(k ds.Key, v []byte) { fmt.Printf("Added: [%s] -> %s\n", k, string(v)) - UpdateNodeTableHook(k, v) + var peer Peer + err := json.Unmarshal(v, &peer) + common.ReportError(err, "Error while unmarshalling peer") + peer.Connected = true + value, err := json.Marshal(peer) + if err == nil { + UpdateNodeTableHook(k, value) + } else { + common.Logger.Error("Error while marshalling peer", err) + } } opts.DeleteHook = func(k ds.Key) { fmt.Printf("Removed: [%s]\n", k) From c6f00fc763e8f86bde63c8c632224bc23da77395 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Wed, 6 Aug 2025 13:58:24 +0200 Subject: [PATCH 3/4] clean up unused reachableOnly option --- src/internal/protocol/host.go | 2 +- src/internal/protocol/node_table.go | 38 +++++++++-------------------- src/internal/server/crdt_handler.go | 3 ++- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/src/internal/protocol/host.go b/src/internal/protocol/host.go index 0888874..a6aa044 100644 --- a/src/internal/protocol/host.go +++ b/src/internal/protocol/host.go @@ -150,7 +150,7 @@ func AllPeers() []*PeerWithStatus { func ConnectedBootstraps() []string { var bootstraps = []string{} - dnt := GetNodeTable(false) + dnt := GetNodeTable() host, _ := GetP2PNode(nil) for _, p := range *dnt { if p.PublicAddress != "" { diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index d445ab0..55b52b5 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -9,8 +9,6 @@ import ( "sync" ds "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/viper" ) @@ -49,6 +47,8 @@ type Peer struct { Version string `json:"version"` PublicAddress string `json:"public_address"` Hardware common.HardwareSpec `json:"hardware"` + Connected bool `json:"connected"` + Load []int `json:"load"` } type PeerWithStatus struct { @@ -61,26 +61,10 @@ type NodeTable map[string]Peer var dnt *NodeTable -func GetNodeTable(reachableOnly bool) *NodeTable { +func GetNodeTable() *NodeTable { dntOnce.Do(func() { dnt = &NodeTable{} }) - if reachableOnly { - host, _ := GetP2PNode(nil) - // filter out the nodes that are not connected - for _, p := range *dnt { - if host.Network().Connectedness(peer.ID(p.ID)) != network.Connected && p.ID != host.ID().String() { - // try to dial the peer - conn, err := host.Network().DialPeer(context.Background(), peer.ID(p.ID)) - if err != nil { - common.Logger.Info("Peer: ", p.ID, " removed from table: ", err) - // delete(*dnt, key) - } else { - defer conn.Close() - } - } - } - } return dnt } @@ -133,7 +117,7 @@ func DeleteNodeTable() { } func UpdateNodeTableHook(key ds.Key, value []byte) { - table := *GetNodeTable(false) + table := *GetNodeTable() var peer Peer err := json.Unmarshal(value, &peer) common.ReportError(err, "Error while unmarshalling peer") @@ -141,12 +125,12 @@ func UpdateNodeTableHook(key ds.Key, value []byte) { } func DeleteNodeTableHook(key ds.Key) { - table := *GetNodeTable(false) + table := *GetNodeTable() delete(table, key.String()) } func GetPeerFromTable(peerId string) (Peer, error) { - table := *GetNodeTable(false) + table := *GetNodeTable() peer, ok := table[peerId] if !ok { return Peer{}, errors.New("peer not found") @@ -173,11 +157,13 @@ func GetService(name string) (Service, error) { func GetAllProviders(serviceName string) ([]Peer, error) { var providers []Peer - table := *GetNodeTable(false) + table := *GetNodeTable() for _, peer := range table { - for _, service := range peer.Service { - if service.Name == serviceName { - providers = append(providers, peer) + if peer.Connected { + for _, service := range peer.Service { + if service.Name == serviceName { + providers = append(providers, peer) + } } } } diff --git a/src/internal/server/crdt_handler.go b/src/internal/server/crdt_handler.go index 34b3a4f..865795e 100644 --- a/src/internal/server/crdt_handler.go +++ b/src/internal/server/crdt_handler.go @@ -28,6 +28,7 @@ func listBootstraps(c *gin.Context) { func updateLocal(c *gin.Context) { var peer protocol.Peer c.BindJSON(&peer) + peer.Connected = true protocol.UpdateNodeTable(peer) } @@ -42,5 +43,5 @@ func getDNT(c *gin.Context) { {ingest.TimestampField: time.Now(), "event": "DNT Lookup"}, } IngestEvents(events) - c.JSON(200, protocol.GetNodeTable(false)) + c.JSON(200, protocol.GetNodeTable()) } From 840da0adebb3ba0914bb83665718eab81f53c031 Mon Sep 17 00:00:00 2001 From: Elia Palme Date: Wed, 6 Aug 2025 13:58:50 +0200 Subject: [PATCH 4/4] fix critical bug in get peer --- src/internal/protocol/node_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/protocol/node_table.go b/src/internal/protocol/node_table.go index 55b52b5..25a480a 100644 --- a/src/internal/protocol/node_table.go +++ b/src/internal/protocol/node_table.go @@ -131,7 +131,7 @@ func DeleteNodeTableHook(key ds.Key) { func GetPeerFromTable(peerId string) (Peer, error) { table := *GetNodeTable() - peer, ok := table[peerId] + peer, ok := table["/"+peerId] if !ok { return Peer{}, errors.New("peer not found") }