Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/internal/protocol/clock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol

import (
"encoding/json"
"math/rand"
"ocf/internal/common"
"ocf/internal/common/process"
Expand Down Expand Up @@ -29,11 +30,19 @@ func StartTicker() {
host, _ := GetP2PNode(nil)
peers := host.Peerstore().Peers()
// updateMyself()
for _, peer := range peers {
for _, peer_id := range peers {
// check if peer is still connected
if peer != host.ID() && host.Network().Connectedness(peer) != network.Connected {
// delete peer from table
DeleteNodeTableHook(ds.NewKey(peer.String()))
peer, error := GetPeerFromTable(peer_id.String())
if error == nil {
peer.Connected = true
if peer_id != host.ID() && host.Network().Connectedness(peer_id) != network.Connected {
common.Logger.Info("Peer:" + peer_id.String() + " got disconnected!")
peer.Connected = false
}
value, err := json.Marshal(peer)
if err == nil {
UpdateNodeTableHook(ds.NewKey(peer_id.String()), value)
}
}
}
if !process.HealthCheck() {
Expand Down
12 changes: 11 additions & 1 deletion src/internal/protocol/crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package protocol

import (
"context"
"encoding/json"
"fmt"
"ocf/internal/common"
"sync"
Expand Down Expand Up @@ -79,7 +80,16 @@ func GetCRDTStore() (*crdt.Datastore, context.CancelFunc) {
opts.RebroadcastInterval = 5 * time.Second
opts.PutHook = func(k ds.Key, v []byte) {
fmt.Printf("Added: [%s] -> %s\n", k, string(v))
UpdateNodeTableHook(k, v)
var peer Peer
err := json.Unmarshal(v, &peer)
common.ReportError(err, "Error while unmarshalling peer")
peer.Connected = true
value, err := json.Marshal(peer)
if err == nil {
UpdateNodeTableHook(k, value)
} else {
common.Logger.Error("Error while marshalling peer", err)
}
}
opts.DeleteHook = func(k ds.Key) {
fmt.Printf("Removed: [%s]\n", k)
Expand Down
2 changes: 1 addition & 1 deletion src/internal/protocol/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func AllPeers() []*PeerWithStatus {

func ConnectedBootstraps() []string {
var bootstraps = []string{}
dnt := GetNodeTable(false)
dnt := GetNodeTable()
host, _ := GetP2PNode(nil)
for _, p := range *dnt {
if p.PublicAddress != "" {
Expand Down
40 changes: 13 additions & 27 deletions src/internal/protocol/node_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sync"

ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -49,6 +47,8 @@ type Peer struct {
Version string `json:"version"`
PublicAddress string `json:"public_address"`
Hardware common.HardwareSpec `json:"hardware"`
Connected bool `json:"connected"`
Load []int `json:"load"`
}

type PeerWithStatus struct {
Expand All @@ -61,26 +61,10 @@ type NodeTable map[string]Peer

var dnt *NodeTable

func GetNodeTable(reachableOnly bool) *NodeTable {
func GetNodeTable() *NodeTable {
dntOnce.Do(func() {
dnt = &NodeTable{}
})
if reachableOnly {
host, _ := GetP2PNode(nil)
// filter out the nodes that are not connected
for _, p := range *dnt {
if host.Network().Connectedness(peer.ID(p.ID)) != network.Connected && p.ID != host.ID().String() {
// try to dial the peer
conn, err := host.Network().DialPeer(context.Background(), peer.ID(p.ID))
if err != nil {
common.Logger.Info("Peer: ", p.ID, " removed from table: ", err)
// delete(*dnt, key)
} else {
defer conn.Close()
}
}
}
}
return dnt
}

Expand Down Expand Up @@ -133,21 +117,21 @@ func DeleteNodeTable() {
}

func UpdateNodeTableHook(key ds.Key, value []byte) {
table := *GetNodeTable(false)
table := *GetNodeTable()
var peer Peer
err := json.Unmarshal(value, &peer)
common.ReportError(err, "Error while unmarshalling peer")
table[key.String()] = peer
}

func DeleteNodeTableHook(key ds.Key) {
table := *GetNodeTable(false)
table := *GetNodeTable()
delete(table, key.String())
}

func GetPeerFromTable(peerId string) (Peer, error) {
table := *GetNodeTable(false)
peer, ok := table[peerId]
table := *GetNodeTable()
peer, ok := table["/"+peerId]
if !ok {
return Peer{}, errors.New("peer not found")
}
Expand All @@ -173,11 +157,13 @@ func GetService(name string) (Service, error) {

func GetAllProviders(serviceName string) ([]Peer, error) {
var providers []Peer
table := *GetNodeTable(false)
table := *GetNodeTable()
for _, peer := range table {
for _, service := range peer.Service {
if service.Name == serviceName {
providers = append(providers, peer)
if peer.Connected {
for _, service := range peer.Service {
if service.Name == serviceName {
providers = append(providers, peer)
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/internal/server/crdt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func listBootstraps(c *gin.Context) {
func updateLocal(c *gin.Context) {
var peer protocol.Peer
c.BindJSON(&peer)
peer.Connected = true
protocol.UpdateNodeTable(peer)
}

Expand All @@ -42,5 +43,5 @@ func getDNT(c *gin.Context) {
{ingest.TimestampField: time.Now(), "event": "DNT Lookup"},
}
IngestEvents(events)
c.JSON(200, protocol.GetNodeTable(false))
c.JSON(200, protocol.GetNodeTable())
}