forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 1
/
centralized_client.go
126 lines (106 loc) · 3.44 KB
/
centralized_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package mockrouting
import (
"context"
"errors"
"time"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
routing "gx/ipfs/QmNdaQ8itUU9jEZUwTsG4gHMaPmRfi6FEe89QjQAFbep3M/go-libp2p-routing"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
dhtpb "gx/ipfs/QmWYCqr6UDqqD1bfRybaAPtbAqcN3TSJpveaBXMwbQ3ePZ/go-libp2p-record/pb"
u "gx/ipfs/QmWbjfz3u6HkAdPh34dgPchGbQjob6LXLhAeCGii2TX69n/go-ipfs-util"
pstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
var log = logging.Logger("mockrouter")
type client struct {
datastore ds.Datastore
server server
peer testutil.Identity
}
// FIXME(brian): is this method meant to simulate putting a value into the network?
func (c *client) PutValue(ctx context.Context, key string, val []byte) error {
log.Debugf("PutValue: %s", key)
rec := new(dhtpb.Record)
rec.Value = val
rec.Key = proto.String(string(key))
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
data, err := proto.Marshal(rec)
if err != nil {
return err
}
return c.datastore.Put(dshelp.NewKeyFromBinary([]byte(key)), data)
}
// FIXME(brian): is this method meant to simulate getting a value from the network?
func (c *client) GetValue(ctx context.Context, key string) ([]byte, error) {
log.Debugf("GetValue: %s", key)
v, err := c.datastore.Get(dshelp.NewKeyFromBinary([]byte(key)))
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
rec := new(dhtpb.Record)
err = proto.Unmarshal(data, rec)
if err != nil {
return nil, err
}
return rec.GetValue(), nil
}
func (c *client) GetValues(ctx context.Context, key string, count int) ([]routing.RecvdVal, error) {
log.Debugf("GetValues: %s", key)
data, err := c.GetValue(ctx, key)
if err != nil {
return nil, err
}
return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil
}
func (c *client) FindProviders(ctx context.Context, key *cid.Cid) ([]pstore.PeerInfo, error) {
return c.server.Providers(key), nil
}
func (c *client) FindPeer(ctx context.Context, pid peer.ID) (pstore.PeerInfo, error) {
log.Debugf("FindPeer: %s", pid)
return pstore.PeerInfo{}, nil
}
func (c *client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo {
out := make(chan pstore.PeerInfo)
go func() {
defer close(out)
for i, p := range c.server.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
// Provide returns once the message is on the network. Value is not necessarily
// visible yet.
func (c *client) Provide(_ context.Context, key *cid.Cid, brd bool) error {
if !brd {
return nil
}
info := pstore.PeerInfo{
ID: c.peer.ID(),
Addrs: []ma.Multiaddr{c.peer.Address()},
}
return c.server.Announce(info, key)
}
func (c *client) Ping(ctx context.Context, p peer.ID) (time.Duration, error) {
return 0, nil
}
func (c *client) Bootstrap(context.Context) error {
return nil
}
var _ routing.IpfsRouting = &client{}