diff --git a/core/core.go b/core/core.go index 7d97bde274..31237a2c0b 100644 --- a/core/core.go +++ b/core/core.go @@ -1,14 +1,10 @@ package core import ( - "bytes" "errors" routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" libp2p "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" - gonet "net" - "net/http" - "net/url" "path" "time" @@ -27,6 +23,7 @@ import ( "github.com/op/go-logging" "golang.org/x/net/context" "golang.org/x/net/proxy" + "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" "sync" ) @@ -83,8 +80,8 @@ type OpenBazaarNode struct { // A service that periodically fetches and caches the bitcoin exchange rates ExchangeRates bitcoin.ExchangeRates - // An optional gateway URL where we can crosspost data to ensure persistence - CrosspostGateways []*url.URL + // Optional nodes to push user data to + PushNodes []peer.ID // The user-agent for this node UserAgent string @@ -94,6 +91,9 @@ type OpenBazaarNode struct { // Manage blocked peers BanManager *net.BanManager + + // Allow other nodes to push data to this node for storage + AcceptStoreRequests bool } // Unpin the current node repo, re-add it, then publish to IPNS @@ -118,22 +118,28 @@ func (n *OpenBazaarNode) SeedNode() error { return aerr } seedLock.Unlock() + id, err := cid.Decode(rootHash) + if err != nil { + log.Error(err) + return err + } - for _, g := range n.CrosspostGateways { - go func(u *url.URL) { - req, err := http.NewRequest("PUT", u.String()+path.Join("ipfs", rootHash), new(bytes.Buffer)) + var graph []cid.Cid + if len(n.PushNodes) > 0 { + graph, err = ipfs.FetchGraph(n.IpfsNode.DAG, id) + if err != nil { + return err + } + } + for _, p := range n.PushNodes { + go func(pid peer.ID) { + err := n.SendStore(pid.Pretty(), graph) if err != nil { - return - } - dial := gonet.Dial - if n.TorDialer != nil { - dial = n.TorDialer.Dial + log.Errorf("Error pushing data to peer %s: %s", pid.Pretty(), err.Error()) } - tbTransport := &http.Transport{Dial: dial} - client := &http.Client{Transport: tbTransport, Timeout: time.Minute} - client.Do(req) - }(g) + }(p) } + go n.publish(rootHash) return nil } @@ -145,6 +151,7 @@ func (n *OpenBazaarNode) publish(hash string) { var err error inflightPublishRequests++ _, err = ipfs.Publish(n.Context, hash) + inflightPublishRequests-- if inflightPublishRequests == 0 { if err != nil { diff --git a/core/net.go b/core/net.go index 7e7b6e5c69..18b7db039b 100644 --- a/core/net.go +++ b/core/net.go @@ -1,22 +1,18 @@ package core import ( - ps "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" multihash "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" libp2p "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto" - "bytes" + "errors" "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/OpenBazaar/openbazaar-go/pb" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" "golang.org/x/net/context" - dhtpb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb" - gonet "net" - "net/http" - "net/url" + "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" "sync" "time" ) @@ -99,28 +95,19 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M if err != nil { log.Error(err) } - OfflineMessageWaitGroup.Done() - }() - // Post provider to gateway if we have one set in the config - if len(n.CrosspostGateways) > 0 { - dial := gonet.Dial - if n.TorDialer != nil { - dial = n.TorDialer.Dial - } - tbTransport := &http.Transport{Dial: dial} - client := &http.Client{Transport: tbTransport, Timeout: time.Minute} - pmes := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, pointer.Cid.KeyString(), 0) - pmes.ProviderPeers = dhtpb.RawPeerInfosToPBPeers([]ps.PeerInfo{pointer.Value}) - ser, err := proto.Marshal(pmes) - if err == nil { - for _, g := range n.CrosspostGateways { - go func(u *url.URL) { - client.Post(u.String()+"ipfs/providers", "application/x-www-form-urlencoded", bytes.NewReader(ser)) - }(g) + // Push provider to our push nodes for redundancy + for _, p := range n.PushNodes { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ipfs.PutPointerToPeer(n.IpfsNode, ctx, p, pointer) + if err != nil { + log.Error(err) } } - } + + OfflineMessageWaitGroup.Done() + }() return nil } @@ -524,3 +511,86 @@ func (n *OpenBazaarNode) SendModeratorRemove(peerId string) error { } return nil } + +func (n *OpenBazaarNode) SendBlock(peerId string, id cid.Cid) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + block, err := n.IpfsNode.Blocks.GetBlock(ctx, &id) + if err != nil { + return err + } + + b := &pb.Block{ + Cid: block.Cid().String(), + RawData: block.RawData(), + } + a, err := ptypes.MarshalAny(b) + if err != nil { + return err + } + m := pb.Message{ + MessageType: pb.Message_BLOCK, + Payload: a, + } + + p, err := peer.IDB58Decode(peerId) + if err != nil { + return err + } + return n.Service.SendMessage(context.Background(), p, &m) +} + +func (n *OpenBazaarNode) SendStore(peerId string, ids []cid.Cid) error { + var s []string + for _, d := range ids { + s = append(s, d.String()) + } + cList := new(pb.CidList) + cList.Cids = s + + a, err := ptypes.MarshalAny(cList) + if err != nil { + return err + } + + m := pb.Message{ + MessageType: pb.Message_STORE, + Payload: a, + } + + p, err := peer.IDB58Decode(peerId) + if err != nil { + return err + } + pmes, err := n.Service.SendRequest(context.Background(), p, &m) + if err != nil { + return err + } + defer n.Service.DisconnectFromPeer(p) + if pmes.Payload == nil { + return errors.New("Peer responded with nil payload") + } + if pmes.MessageType == pb.Message_ERROR { + log.Errorf("Error response from %s: %s", peerId, string(pmes.Payload.Value)) + return errors.New("Peer responded with error message") + } + + resp := new(pb.CidList) + err = ptypes.UnmarshalAny(pmes.Payload, resp) + if err != nil { + return err + } + if len(resp.Cids) == 0 { + log.Debugf("Peer %s requested no blocks", peerId) + return nil + } + log.Debugf("Sending %d blocks to %s", len(resp.Cids), peerId) + for _, id := range resp.Cids { + decoded, err := cid.Decode(id) + if err != nil { + continue + } + n.SendBlock(peerId, *decoded) + } + return nil +} diff --git a/ipfs/pointers.go b/ipfs/pointers.go index 8178338510..72361a881d 100644 --- a/ipfs/pointers.go +++ b/ipfs/pointers.go @@ -12,8 +12,6 @@ import ( multihash "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash" ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" - ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" - host "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host" "github.com/ipfs/go-ipfs/core" @@ -21,9 +19,8 @@ import ( "time" routing "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht" + dhtpb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb" pb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb" - - ctxio "github.com/jbenet/go-context/io" ) const MAGIC string = "000000000000000000000000" @@ -91,9 +88,23 @@ func FindPointers(dht *routing.IpfsDHT, ctx context.Context, mhKey multihash.Mul return providers, nil } +func PutPointerToPeer(node *core.IpfsNode, ctx context.Context, peer peer.ID, pointer Pointer) error { + dht := node.Routing.(*routing.IpfsDHT) + return putPointer(ctx, dht, peer, pointer.Value, pointer.Cid.KeyString()) +} + +func GetPointersFromPeer(node *core.IpfsNode, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) { + dht := node.Routing.(*routing.IpfsDHT) + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0) + resp, err := dht.SendRequest(ctx, p, pmes) + if err != nil { + return []*ps.PeerInfo{}, err + } + return dhtpb.PBPeersToPeerInfos(resp.GetProviderPeers()), nil +} + func addPointer(node *core.IpfsNode, ctx context.Context, k *cid.Cid, pi ps.PeerInfo) error { dht := node.Routing.(*routing.IpfsDHT) - peerHosts := node.PeerHost peers, err := dht.GetClosestPeers(ctx, k.KeyString()) if err != nil { return err @@ -103,40 +114,24 @@ func addPointer(node *core.IpfsNode, ctx context.Context, k *cid.Cid, pi ps.Peer wg.Add(1) go func(p peer.ID) { defer wg.Done() - putPointer(ctx, peerHosts.(host.Host), p, pi, k.KeyString()) + putPointer(ctx, dht, p, pi, k.KeyString()) }(p) } wg.Wait() return nil } -func putPointer(ctx context.Context, peerHosts host.Host, p peer.ID, pi ps.PeerInfo, skey string) error { +func putPointer(ctx context.Context, dht *routing.IpfsDHT, p peer.ID, pi ps.PeerInfo, skey string) error { pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey, 0) pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]ps.PeerInfo{pi}) - err := sendMessage(ctx, peerHosts, p, pmes) + err := dht.SendMessage(ctx, p, pmes) if err != nil { return err } return nil } -func sendMessage(ctx context.Context, host host.Host, p peer.ID, pmes *pb.Message) error { - s, err := host.NewStream(ctx, p, routing.ProtocolDHT) - if err != nil { - return err - } - defer s.Close() - - cw := ctxio.NewWriter(ctx, s) - w := ggio.NewDelimitedWriter(cw) - - if err := w.WriteMsg(pmes); err != nil { - return err - } - return nil -} - func CreatePointerKey(mh multihash.Multihash, prefixLen int) multihash.Multihash { // Grab the first 8 bytes from the multihash digest m, _ := multihash.Decode(mh) diff --git a/net/networkservice.go b/net/networkservice.go index 2c3479702d..cb3e1d0d35 100644 --- a/net/networkservice.go +++ b/net/networkservice.go @@ -23,4 +23,7 @@ type NetworkService interface { // Send a message to a peer without requiring a response SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error + + // Disconnect from the given peer + DisconnectFromPeer(p peer.ID) error } diff --git a/net/repointer/repointer.go b/net/repointer/repointer.go index f3735b9331..ebf3db745e 100644 --- a/net/repointer/repointer.go +++ b/net/repointer/repointer.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/op/go-logging" "golang.org/x/net/context" + "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" ) var log = logging.MustGetLogger("service") @@ -18,13 +19,15 @@ const kPointerExpiration = time.Hour * 24 * 30 type PointerRepublisher struct { ipfsNode *core.IpfsNode db repo.Datastore + pushNodes []peer.ID isModerator func() bool } -func NewPointerRepublisher(node *core.IpfsNode, database repo.Datastore, isModerator func() bool) *PointerRepublisher { +func NewPointerRepublisher(node *core.IpfsNode, database repo.Datastore, pushNodes []peer.ID, isModerator func() bool) *PointerRepublisher { return &PointerRepublisher{ ipfsNode: node, db: database, + pushNodes: pushNodes, isModerator: isModerator, } } @@ -46,6 +49,7 @@ func (r *PointerRepublisher) Republish() { return } ctx := context.Background() + for _, p := range pointers { switch p.Purpose { case ipfs.MESSAGE: @@ -53,6 +57,9 @@ func (r *PointerRepublisher) Republish() { r.db.Pointers().Delete(p.Value.ID) } else { go ipfs.PublishPointer(r.ipfsNode, ctx, p) + for _, peer := range r.pushNodes { + go ipfs.PutPointerToPeer(r.ipfsNode, context.Background(), peer, p) + } } case ipfs.MODERATOR: if republishModerator { diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index 9f977a86b0..067520715c 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -8,13 +8,14 @@ import ( "github.com/OpenBazaar/openbazaar-go/repo" "github.com/golang/protobuf/proto" "github.com/ipfs/go-ipfs/commands" + "golang.org/x/net/proxy" + "github.com/ipfs/go-ipfs/core" routing "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht" - dhtpb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb" "github.com/op/go-logging" - "golang.org/x/net/proxy" + "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" ps "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore" multihash "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash" ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr" @@ -23,7 +24,6 @@ import ( "io/ioutil" gonet "net" "net/http" - "net/url" "sync" "time" ) @@ -33,17 +33,17 @@ const DefaultPointerPrefixLength = 14 var log = logging.MustGetLogger("retriever") type MessageRetriever struct { - db repo.Datastore - node *core.IpfsNode - bm *net.BanManager - ctx commands.Context - service net.NetworkService - prefixLen int - sendAck func(peerId string, pointerID peer.ID) error - messageQueue map[pb.Message_MessageType][]offlineMessage - httpClient *http.Client - crosspostGateways []*url.URL - queueLock *sync.Mutex + db repo.Datastore + node *core.IpfsNode + bm *net.BanManager + ctx commands.Context + service net.NetworkService + prefixLen int + sendAck func(peerId string, pointerID peer.ID) error + messageQueue map[pb.Message_MessageType][]offlineMessage + httpClient *http.Client + dataPeers []peer.ID + queueLock *sync.Mutex *sync.WaitGroup } @@ -52,32 +52,36 @@ type offlineMessage struct { env pb.Envelope } -func NewMessageRetriever(db repo.Datastore, ctx commands.Context, node *core.IpfsNode, bm *net.BanManager, service net.NetworkService, prefixLen int, dialer proxy.Dialer, crosspostGateways []*url.URL, sendAck func(peerId string, pointerID peer.ID) error) *MessageRetriever { +func NewMessageRetriever(db repo.Datastore, ctx commands.Context, node *core.IpfsNode, bm *net.BanManager, service net.NetworkService, prefixLen int, pushNodes []peer.ID, dialer proxy.Dialer, sendAck func(peerId string, pointerID peer.ID) error) *MessageRetriever { dial := gonet.Dial if dialer != nil { dial = dialer.Dial } tbTransport := &http.Transport{Dial: dial} client := &http.Client{Transport: tbTransport, Timeout: time.Second * 30} - mr := MessageRetriever{db, node, bm, ctx, service, prefixLen, sendAck, make(map[pb.Message_MessageType][]offlineMessage), client, crosspostGateways, new(sync.Mutex), new(sync.WaitGroup)} + mr := MessageRetriever{db, node, bm, ctx, service, prefixLen, sendAck, make(map[pb.Message_MessageType][]offlineMessage), client, pushNodes, new(sync.Mutex), new(sync.WaitGroup)} // Add one for initial wait at start up mr.Add(1) return &mr } func (m *MessageRetriever) Run() { - tick := time.NewTicker(time.Hour) - defer tick.Stop() - go m.fetchPointers() + dht := time.NewTicker(time.Hour) + peers := time.NewTicker(time.Minute * 10) + defer dht.Stop() + defer peers.Stop() + go m.fetchPointers(true) for { select { - case <-tick.C: - go m.fetchPointers() + case <-dht.C: + go m.fetchPointers(true) + case <-peers.C: + go m.fetchPointers(false) } } } -func (m *MessageRetriever) fetchPointers() { +func (m *MessageRetriever) fetchPointers(useDHT bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg := new(sync.WaitGroup) @@ -87,21 +91,24 @@ func (m *MessageRetriever) fetchPointers() { peerOut := make(chan ps.PeerInfo) go func(c chan ps.PeerInfo) { pwg := new(sync.WaitGroup) - pwg.Add(2) + pwg.Add(1) go func(c chan ps.PeerInfo) { - out := m.getPointersFromGateway() + out := m.getPointersDataPeers() for p := range out { c <- p } pwg.Done() }(c) - go func(c chan ps.PeerInfo) { - iout := ipfs.FindPointersAsync(m.node.Routing.(*routing.IpfsDHT), ctx, mh, m.prefixLen) - for p := range iout { - c <- p - } - pwg.Done() - }(c) + if useDHT { + pwg.Add(1) + go func(c chan ps.PeerInfo) { + iout := ipfs.FindPointersAsync(m.node.Routing.(*routing.IpfsDHT), ctx, mh, m.prefixLen) + for p := range iout { + c <- p + } + pwg.Done() + }(c) + } pwg.Wait() close(c) }(peerOut) @@ -152,41 +159,34 @@ func (m *MessageRetriever) fetchPointers() { } } -func (m *MessageRetriever) getPointersFromGateway() <-chan ps.PeerInfo { +func (m *MessageRetriever) getPointersDataPeers() <-chan ps.PeerInfo { peerOut := make(chan ps.PeerInfo, 100000) - go m.getPointersFromGatewayRoutine(peerOut) + go m.getPointersFromDataPeersRoutine(peerOut) return peerOut } -func (m *MessageRetriever) getPointersFromGatewayRoutine(peerOut chan ps.PeerInfo) { +func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerInfo) { defer close(peerOut) mh, _ := multihash.FromB58String(m.node.Identity.Pretty()) keyhash := ipfs.CreatePointerKey(mh, DefaultPointerPrefixLength) - for _, g := range m.crosspostGateways { - resp, err := m.httpClient.Get(g.String() + "ipfs/providers/" + keyhash.B58String()) - if err != nil { - log.Errorf("Error retrieving offline message from gateway: %s", err.Error()) - return - } - if resp.StatusCode != http.StatusOK { - return - } - buf, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Errorf("Error reading message from gateway: %s", err.Error()) - return - } - pmes := new(dhtpb.Message) - err = proto.Unmarshal(buf, pmes) - if err != nil { - log.Errorf("Error unmarshalling pointer from gateway: %s", err.Error()) - return - } - provs := dhtpb.PBPeersToPeerInfos(pmes.GetProviderPeers()) - for _, pi := range provs { - peerOut <- *pi - } + k, _ := cid.Decode(keyhash.B58String()) + var wg sync.WaitGroup + for _, p := range m.dataPeers { + wg.Add(1) + go func(pid peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + provs, err := ipfs.GetPointersFromPeer(m.node, ctx, pid, k) + if err != nil { + return + } + for _, pi := range provs { + peerOut <- *pi + } + }(p) } + wg.Wait() } func (m *MessageRetriever) fetchIPFS(pid peer.ID, ctx commands.Context, addr ma.Multiaddr, wg *sync.WaitGroup) { diff --git a/net/service/handlers.go b/net/service/handlers.go index 463606ae66..0380ccbf42 100644 --- a/net/service/handlers.go +++ b/net/service/handlers.go @@ -20,6 +20,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" + "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" "strconv" ) @@ -61,6 +63,10 @@ func (service *OpenBazaarService) HandlerForMsgType(t pb.Message_MessageType) fu return service.handleModeratorAdd case pb.Message_MODERATOR_REMOVE: return service.handleModeratorRemove + case pb.Message_BLOCK: + return service.handleBlock + case pb.Message_STORE: + return service.handleStore default: return nil } @@ -1325,3 +1331,85 @@ func (service *OpenBazaarService) handleModeratorRemove(pid peer.ID, pmes *pb.Me return nil, nil } + +func (service *OpenBazaarService) handleBlock(pid peer.ID, pmes *pb.Message, options interface{}) (*pb.Message, error) { + // If we aren't accepting store requests then ban this peer + if !service.node.AcceptStoreRequests { + service.node.BanManager.AddBlockedId(pid) + return nil, nil + } + + if pmes.Payload == nil { + return nil, errors.New("Payload is nil") + } + b := new(pb.Block) + err := ptypes.UnmarshalAny(pmes.Payload, b) + if err != nil { + return nil, err + } + id, err := cid.Decode(b.Cid) + if err != nil { + return nil, err + } + block, err := blocks.NewBlockWithCid(b.RawData, id) + if err != nil { + return nil, err + } + _, err = service.node.IpfsNode.Blocks.AddBlock(block) + if err != nil { + return nil, err + } + log.Debugf("Received BLOCK message from %s", pid.Pretty()) + return nil, nil +} + +func (service *OpenBazaarService) handleStore(pid peer.ID, pmes *pb.Message, options interface{}) (*pb.Message, error) { + // If we aren't accepting store requests then ban this peer + if !service.node.AcceptStoreRequests { + service.node.BanManager.AddBlockedId(pid) + return nil, nil + } + + errorResponse := func(error string) *pb.Message { + a := &any.Any{Value: []byte(error)} + m := &pb.Message{ + MessageType: pb.Message_ERROR, + Payload: a, + } + return m + } + + if pmes.Payload == nil { + return errorResponse("Payload is nil"), errors.New("Payload is nil") + } + cList := new(pb.CidList) + err := ptypes.UnmarshalAny(pmes.Payload, cList) + if err != nil { + return errorResponse("Could not unmarshall message"), err + } + var need []string + for _, id := range cList.Cids { + decoded, err := cid.Decode(id) + if err != nil { + continue + } + has, err := service.node.IpfsNode.Blockstore.Has(decoded) + if err != nil || !has { + need = append(need, decoded.String()) + } + } + log.Debugf("Received STORE message from %s", pid.Pretty()) + log.Debugf("Requesting %d blocks from %s", len(need), pid.Pretty()) + + resp := new(pb.CidList) + resp.Cids = need + a, err := ptypes.MarshalAny(resp) + if err != nil { + return errorResponse("Error marshalling response"), err + } + m := &pb.Message{ + MessageType: pb.Message_STORE, + Payload: a, + } + return m, nil +} diff --git a/net/service/service.go b/net/service/service.go index 09632c4723..127833c2cb 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/go-ipfs/commands" ctxio "github.com/jbenet/go-context/io" "github.com/op/go-logging" + "io" ) var log = logging.MustGetLogger("service") @@ -54,13 +55,25 @@ func New(node *core.OpenBazaarNode, ctx commands.Context, datastore repo.Datasto return service } +func (service *OpenBazaarService) DisconnectFromPeer(p peer.ID) error { + log.Debugf("Disconnecting from %s", p.Pretty()) + service.senderlk.Lock() + defer service.senderlk.Unlock() + ms, ok := service.sender[p] + if !ok { + return nil + } + ms.s.Close() + delete(service.sender, p) + return nil +} + func (service *OpenBazaarService) HandleNewStream(s inet.Stream) { go service.handleNewMessage(s, true) } func (service *OpenBazaarService) handleNewMessage(s inet.Stream, incoming bool) { defer s.Close() - cr := ctxio.NewReader(service.ctx, s) // ok to use. we defer close stream in this func r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) mPeer := s.Conn().RemotePeer() @@ -86,7 +99,9 @@ func (service *OpenBazaarService) handleNewMessage(s inet.Stream, incoming bool) pmes := new(pb.Message) if err := r.ReadMsg(pmes); err != nil { s.Reset() - log.Debugf("Error unmarshaling data: %s", err) + if err == io.EOF { + log.Debugf("Disconnected from peer %s", mPeer.Pretty()) + } return } @@ -169,10 +184,11 @@ func (service *OpenBazaarService) SendRequest(ctx context.Context, p peer.ID, pm } func (service *OpenBazaarService) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { - log.Debugf("Sending %s message to %s", pmes.MessageType.String(), p.Pretty()) + if pmes.MessageType != pb.Message_BLOCK { + log.Debugf("Sending %s message to %s", pmes.MessageType.String(), p.Pretty()) + } ms, err := service.messageSenderForPeer(p) if err != nil { - log.Error("Error creating new message sender") return err } diff --git a/openbazaard.go b/openbazaard.go index 7ed0e9de54..436ff88b64 100644 --- a/openbazaard.go +++ b/openbazaard.go @@ -488,7 +488,7 @@ func (x *Start) Execute(args []string) error { log.Error(err) return err } - gatewayUrlStrings, err := repo.GetCrosspostGateway(configFile) + dataSharing, err := repo.GetDataSharing(configFile) if err != nil { log.Error(err) return err @@ -542,7 +542,7 @@ func (x *Start) Execute(args []string) error { bitswap.ProtocolBitswap = "/openbazaar/bitswap/testnet/1.1.0" service.ProtocolOpenBazaar = "/openbazaar/app/testnet/1.0.0" - gatewayUrlStrings = []string{} + dataSharing.PushTo = []string{} } onionAddr, err := obnet.MaybeCreateHiddenServiceKey(repoPath) @@ -797,17 +797,15 @@ func (x *Start) Execute(args []string) error { log.Fatal("Unknown wallet type") } - // Crosspost gateway - var gatewayUrls []*url.URL - for _, gw := range gatewayUrlStrings { - if gw != "" { - u, err := url.Parse(gw) - if err != nil { - log.Error(err) - return err - } - gatewayUrls = append(gatewayUrls, u) + // Push nodes + var pushNodes []peer.ID + for _, pnd := range dataSharing.PushTo { + p, err := peer.IDB58Decode(pnd) + if err != nil { + log.Error("Invalid peerID in DataSharing config") + return err } + pushNodes = append(pushNodes, p) } // Authenticated gateway @@ -865,32 +863,7 @@ func (x *Start) Execute(args []string) error { } } - // Offline messaging storage - var storage sto.OfflineMessagingStorage - if x.Storage == "self-hosted" || x.Storage == "" { - storage = selfhosted.NewSelfHostedStorage(repoPath, ctx, gatewayUrls, torDialer) - } else if x.Storage == "dropbox" { - if usingTor && !usingClearnet { - log.Error("Dropbox can not be used with Tor") - return errors.New("Dropbox can not be used with Tor") - } - - if dropboxToken == "" { - err = errors.New("Dropbox token not set in config file") - log.Error(err) - return err - } - storage, err = dropbox.NewDropBoxStorage(dropboxToken) - if err != nil { - log.Error(err) - return err - } - } else { - err = errors.New("Invalid storage option") - log.Error(err) - return err - } - + // Exchange rates var exchangeRates bitcoin.ExchangeRates if !x.DisableExchangeRates { exchangeRates = exchange.NewBitcoinPriceFetcher(torDialer) @@ -929,20 +902,47 @@ func (x *Start) Execute(args []string) error { // OpenBazaar node setup core.Node = &core.OpenBazaarNode{ - Context: ctx, - IpfsNode: nd, - RootHash: ipath.Path(e.Value).String(), - RepoPath: repoPath, - Datastore: sqliteDB, - Wallet: cryptoWallet, - MessageStorage: storage, - NameSystem: ns, - ExchangeRates: exchangeRates, - CrosspostGateways: gatewayUrls, - TorDialer: torDialer, - UserAgent: core.USERAGENT, - BanManager: bm, + Context: ctx, + IpfsNode: nd, + RootHash: ipath.Path(e.Value).String(), + RepoPath: repoPath, + Datastore: sqliteDB, + Wallet: cryptoWallet, + NameSystem: ns, + ExchangeRates: exchangeRates, + PushNodes: pushNodes, + AcceptStoreRequests: dataSharing.AcceptStoreRequests, + TorDialer: torDialer, + UserAgent: core.USERAGENT, + BanManager: bm, + } + + // Offline messaging storage + var storage sto.OfflineMessagingStorage + if x.Storage == "self-hosted" || x.Storage == "" { + storage = selfhosted.NewSelfHostedStorage(repoPath, ctx, pushNodes, core.Node.SendStore) + } else if x.Storage == "dropbox" { + if usingTor && !usingClearnet { + log.Error("Dropbox can not be used with Tor") + return errors.New("Dropbox can not be used with Tor") + } + + if dropboxToken == "" { + err = errors.New("Dropbox token not set in config file") + log.Error(err) + return err + } + storage, err = dropbox.NewDropBoxStorage(dropboxToken) + if err != nil { + log.Error(err) + return err + } + } else { + err = errors.New("Invalid storage option") + log.Error(err) + return err } + core.Node.MessageStorage = storage if len(cfg.Addresses.Gateway) <= 0 { return ErrNoGateways @@ -958,12 +958,12 @@ func (x *Start) Execute(args []string) error { } go func() { - <-ipfscore.DefaultBootstrapConfig.DoneChan + <-dht.DefaultBootstrapConfig.DoneChan core.Node.Service = service.New(core.Node, ctx, sqliteDB) - MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, bm, core.Node.Service, 14, torDialer, core.Node.CrosspostGateways, core.Node.SendOfflineAck) + MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, bm, core.Node.Service, 14, core.Node.PushNodes, torDialer, core.Node.SendOfflineAck) go MR.Run() core.Node.MessageRetriever = MR - PR := rep.NewPointerRepublisher(nd, sqliteDB, core.Node.IsModerator) + PR := rep.NewPointerRepublisher(nd, sqliteDB, core.Node.PushNodes, core.Node.IsModerator) go PR.Run() core.Node.PointerRepublisher = PR if !x.DisableWallet { diff --git a/pb/api.pb.go b/pb/api.pb.go index 0f88bf2cc9..75385eadb4 100644 --- a/pb/api.pb.go +++ b/pb/api.pb.go @@ -44,6 +44,8 @@ It has these top-level messages: Envelope Chat SignedData + CidList + Block Moderator DisputeUpdate Profile diff --git a/pb/message.pb.go b/pb/message.pb.go index f4bf5a87c8..93154b743a 100644 --- a/pb/message.pb.go +++ b/pb/message.pb.go @@ -36,6 +36,8 @@ const ( Message_OFFLINE_RELAY Message_MessageType = 15 Message_MODERATOR_ADD Message_MessageType = 16 Message_MODERATOR_REMOVE Message_MessageType = 17 + Message_STORE Message_MessageType = 18 + Message_BLOCK Message_MessageType = 19 Message_ERROR Message_MessageType = 500 ) @@ -58,6 +60,8 @@ var Message_MessageType_name = map[int32]string{ 15: "OFFLINE_RELAY", 16: "MODERATOR_ADD", 17: "MODERATOR_REMOVE", + 18: "STORE", + 19: "BLOCK", 500: "ERROR", } var Message_MessageType_value = map[string]int32{ @@ -79,6 +83,8 @@ var Message_MessageType_value = map[string]int32{ "OFFLINE_RELAY": 15, "MODERATOR_ADD": 16, "MODERATOR_REMOVE": 17, + "STORE": 18, + "BLOCK": 19, "ERROR": 500, } @@ -295,12 +301,54 @@ func (m *SignedData_Command) GetTimestamp() *google_protobuf.Timestamp { return nil } +type CidList struct { + Cids []string `protobuf:"bytes,1,rep,name=cids" json:"cids,omitempty"` +} + +func (m *CidList) Reset() { *m = CidList{} } +func (m *CidList) String() string { return proto.CompactTextString(m) } +func (*CidList) ProtoMessage() {} +func (*CidList) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{4} } + +func (m *CidList) GetCids() []string { + if m != nil { + return m.Cids + } + return nil +} + +type Block struct { + RawData []byte `protobuf:"bytes,1,opt,name=rawData,proto3" json:"rawData,omitempty"` + Cid string `protobuf:"bytes,2,opt,name=cid" json:"cid,omitempty"` +} + +func (m *Block) Reset() { *m = Block{} } +func (m *Block) String() string { return proto.CompactTextString(m) } +func (*Block) ProtoMessage() {} +func (*Block) Descriptor() ([]byte, []int) { return fileDescriptor3, []int{5} } + +func (m *Block) GetRawData() []byte { + if m != nil { + return m.RawData + } + return nil +} + +func (m *Block) GetCid() string { + if m != nil { + return m.Cid + } + return "" +} + func init() { proto.RegisterType((*Message)(nil), "Message") proto.RegisterType((*Envelope)(nil), "Envelope") proto.RegisterType((*Chat)(nil), "Chat") proto.RegisterType((*SignedData)(nil), "SignedData") proto.RegisterType((*SignedData_Command)(nil), "SignedData.Command") + proto.RegisterType((*CidList)(nil), "CidList") + proto.RegisterType((*Block)(nil), "Block") proto.RegisterEnum("Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("Chat_Flag", Chat_Flag_name, Chat_Flag_value) } @@ -308,47 +356,51 @@ func init() { func init() { proto.RegisterFile("message.proto", fileDescriptor3) } var fileDescriptor3 = []byte{ - // 661 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x93, 0x4f, 0x6f, 0xd3, 0x4c, - 0x10, 0xc6, 0xeb, 0xc4, 0xf9, 0x37, 0x4e, 0xd3, 0xed, 0xaa, 0x6f, 0x95, 0xb7, 0x42, 0x25, 0xf2, - 0x01, 0x85, 0x8b, 0x2b, 0x05, 0x09, 0x71, 0x35, 0xf6, 0xba, 0x18, 0x1c, 0x3b, 0xda, 0x38, 0xa0, - 0x72, 0x89, 0x1c, 0xbc, 0x35, 0x81, 0xc4, 0x36, 0xb1, 0x83, 0x14, 0xee, 0x7c, 0x3c, 0x3e, 0x02, - 0xdf, 0x82, 0x33, 0xa0, 0x5d, 0xdb, 0xa4, 0x2d, 0x52, 0x25, 0x4e, 0xd9, 0xf9, 0xcd, 0x93, 0xd9, - 0xf1, 0x3c, 0xb3, 0x70, 0xb8, 0x66, 0x59, 0x16, 0x44, 0x4c, 0x4b, 0x37, 0x49, 0x9e, 0x9c, 0xfd, - 0x1f, 0x25, 0x49, 0xb4, 0x62, 0x17, 0x22, 0x5a, 0x6c, 0xaf, 0x2f, 0x82, 0x78, 0x57, 0xa6, 0x1e, - 0xde, 0x4d, 0xe5, 0xcb, 0x35, 0xcb, 0xf2, 0x60, 0x9d, 0x16, 0x02, 0xf5, 0x57, 0x1d, 0x5a, 0xe3, - 0xa2, 0x1a, 0x7e, 0x0a, 0x4a, 0x59, 0xd8, 0xdf, 0xa5, 0xac, 0x2f, 0x0d, 0xa4, 0x61, 0x6f, 0x74, - 0xa2, 0x95, 0xe9, 0xea, 0x97, 0xe7, 0xe8, 0x4d, 0x21, 0xd6, 0xa0, 0x95, 0x06, 0xbb, 0x55, 0x12, - 0x84, 0xfd, 0xda, 0x40, 0x1a, 0x2a, 0xa3, 0x13, 0xad, 0xb8, 0x56, 0xab, 0xae, 0xd5, 0xf4, 0x78, - 0x47, 0x2b, 0x11, 0x7e, 0x00, 0x9d, 0x0d, 0xfb, 0xb4, 0x65, 0x59, 0x6e, 0x87, 0xfd, 0xfa, 0x40, - 0x1a, 0x36, 0xe8, 0x1e, 0xe0, 0x73, 0x80, 0x65, 0x46, 0x59, 0x96, 0x26, 0x71, 0xc6, 0xfa, 0xf2, - 0x40, 0x1a, 0xb6, 0xe9, 0x0d, 0xa2, 0x7e, 0xab, 0x81, 0x72, 0xa3, 0x15, 0xdc, 0x06, 0x79, 0x62, - 0xbb, 0x97, 0xe8, 0x80, 0x9f, 0x8c, 0x17, 0xba, 0x8f, 0x24, 0x0c, 0xd0, 0xb4, 0x3c, 0xc7, 0xf1, - 0xde, 0xa0, 0x1a, 0xee, 0x42, 0x7b, 0xe6, 0x96, 0x51, 0x1d, 0x77, 0xa0, 0xe1, 0x51, 0x93, 0x50, - 0x24, 0x63, 0x04, 0x5d, 0x71, 0x9c, 0x53, 0xf2, 0x92, 0x18, 0x3e, 0x6a, 0xec, 0x89, 0xa1, 0xbb, - 0x06, 0x71, 0x50, 0x13, 0x9f, 0x02, 0x2e, 0x89, 0xe7, 0x5a, 0x36, 0x1d, 0xeb, 0xbe, 0xed, 0xb9, - 0xa8, 0x85, 0xff, 0x83, 0xe3, 0x82, 0x5b, 0x33, 0xc7, 0xb2, 0x1d, 0x67, 0x4c, 0x5c, 0x1f, 0xb5, - 0xf1, 0x09, 0xa0, 0x4a, 0x3e, 0x9e, 0x38, 0x44, 0x88, 0x3b, 0xbc, 0xac, 0x69, 0x4f, 0x27, 0x33, - 0x9f, 0xcc, 0xbd, 0x09, 0x71, 0x11, 0x60, 0x0c, 0xbd, 0x8a, 0xcc, 0x26, 0xa6, 0xee, 0x13, 0xa4, - 0xe0, 0x63, 0x38, 0xac, 0x98, 0xe1, 0x78, 0x53, 0x82, 0xba, 0xfc, 0x33, 0x28, 0xb1, 0x66, 0xae, - 0x89, 0x0e, 0xf1, 0x11, 0x28, 0x9e, 0x65, 0x39, 0xb6, 0x4b, 0xe6, 0xba, 0xf1, 0x0a, 0xf5, 0xb8, - 0xbe, 0x02, 0x94, 0x38, 0xfa, 0x15, 0x3a, 0xe2, 0x68, 0xec, 0x99, 0x84, 0xea, 0xbe, 0x47, 0xe7, - 0xba, 0x69, 0x22, 0xc4, 0x3b, 0xda, 0x23, 0x4a, 0xc6, 0xde, 0x6b, 0x82, 0x8e, 0x31, 0x40, 0x83, - 0x50, 0xea, 0x51, 0xf4, 0xa3, 0xae, 0x86, 0xd0, 0x26, 0xf1, 0x67, 0xb6, 0x4a, 0x52, 0x86, 0x55, - 0x68, 0x95, 0xc6, 0x0a, 0xf7, 0x95, 0x51, 0xbb, 0x72, 0x9d, 0x56, 0x09, 0x7c, 0x0a, 0xcd, 0x74, - 0xbb, 0xf8, 0xc8, 0x76, 0xc2, 0xec, 0x2e, 0x2d, 0x23, 0xee, 0x6a, 0xb6, 0x8c, 0xe2, 0x20, 0xdf, - 0x6e, 0x98, 0x70, 0xb5, 0x4b, 0xf7, 0x40, 0xfd, 0x2e, 0x81, 0x6c, 0xbc, 0x0f, 0x72, 0x2e, 0x2b, - 0x2b, 0xd9, 0xa1, 0xb8, 0xa4, 0x43, 0xf7, 0x00, 0xf7, 0xa1, 0x95, 0x6d, 0x17, 0x1f, 0xd8, 0xbb, - 0x5c, 0x54, 0xef, 0xd0, 0x2a, 0xe4, 0x99, 0xaa, 0xb5, 0x7a, 0x91, 0xa9, 0x1a, 0x7a, 0x06, 0x9d, - 0x3f, 0x5b, 0x2d, 0xf6, 0x45, 0x19, 0x9d, 0xfd, 0xb5, 0x80, 0x7e, 0xa5, 0xa0, 0x7b, 0x31, 0x3e, - 0x07, 0xf9, 0x7a, 0x15, 0x44, 0xfd, 0x86, 0xd8, 0x74, 0xd0, 0x78, 0x83, 0x9a, 0xb5, 0x0a, 0x22, - 0x2a, 0xb8, 0xfa, 0x18, 0x64, 0x1e, 0x61, 0x05, 0x5a, 0x63, 0x32, 0x9d, 0xea, 0x97, 0x04, 0x1d, - 0x70, 0x53, 0xfc, 0x2b, 0xb1, 0x71, 0x12, 0xdf, 0x38, 0x4a, 0x74, 0x13, 0xd5, 0xd4, 0x9f, 0x12, - 0xc0, 0x74, 0x19, 0xc5, 0x2c, 0x34, 0x83, 0x3c, 0xc0, 0x2a, 0x74, 0x33, 0x16, 0x87, 0x6c, 0x33, - 0x29, 0x46, 0x25, 0x89, 0x79, 0xdc, 0x62, 0xf8, 0x11, 0xf4, 0x32, 0xb6, 0x59, 0x06, 0xab, 0xe5, - 0x97, 0xe2, 0x5f, 0xe5, 0x40, 0xef, 0xd0, 0xfb, 0x07, 0x7b, 0xf6, 0x55, 0x82, 0x96, 0x91, 0xac, - 0xd7, 0x41, 0x1c, 0x0a, 0x6b, 0x18, 0xdb, 0xd8, 0x66, 0x39, 0xd8, 0x32, 0xc2, 0x43, 0x90, 0x73, - 0xfe, 0xa2, 0x6b, 0xf7, 0xbc, 0x68, 0xa1, 0xb8, 0x3d, 0xcb, 0xfa, 0x3f, 0xcc, 0xf2, 0xb9, 0xfc, - 0xb6, 0x96, 0x2e, 0x16, 0x4d, 0x21, 0x7a, 0xf2, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x05, 0x1e, 0xc9, - 0x01, 0xa2, 0x04, 0x00, 0x00, + // 726 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x53, 0x41, 0x6f, 0xf3, 0x44, + 0x10, 0xad, 0x13, 0x27, 0x4e, 0xc6, 0x69, 0xba, 0x5d, 0x4a, 0x15, 0x2a, 0x28, 0x91, 0x0f, 0x28, + 0x5c, 0x5c, 0x29, 0x95, 0x10, 0x57, 0xd7, 0x5e, 0x17, 0x53, 0xc7, 0x8e, 0x36, 0x0e, 0xa8, 0x5c, + 0x22, 0x27, 0xde, 0x06, 0xd3, 0xc4, 0x36, 0xb1, 0x03, 0x0a, 0x77, 0x7e, 0x02, 0x3f, 0x8f, 0x13, + 0x7f, 0x81, 0x33, 0x42, 0xbb, 0xb6, 0x49, 0x5b, 0xa4, 0x4a, 0xdf, 0x29, 0x33, 0x6f, 0x5e, 0x66, + 0xc6, 0x6f, 0xde, 0xc2, 0xe9, 0x96, 0xe5, 0x79, 0xb8, 0x66, 0x7a, 0xb6, 0x4b, 0x8b, 0xf4, 0xea, + 0x93, 0x75, 0x9a, 0xae, 0x37, 0xec, 0x46, 0x64, 0xcb, 0xfd, 0xd3, 0x4d, 0x98, 0x1c, 0xaa, 0xd2, + 0xe7, 0x6f, 0x4b, 0x45, 0xbc, 0x65, 0x79, 0x11, 0x6e, 0xb3, 0x92, 0xa0, 0xfd, 0x21, 0x83, 0x32, + 0x29, 0xbb, 0xe1, 0xaf, 0x40, 0xad, 0x1a, 0x07, 0x87, 0x8c, 0x0d, 0xa4, 0xa1, 0x34, 0xea, 0x8f, + 0x2f, 0xf4, 0xaa, 0x5c, 0xff, 0xf2, 0x1a, 0x7d, 0x49, 0xc4, 0x3a, 0x28, 0x59, 0x78, 0xd8, 0xa4, + 0x61, 0x34, 0x68, 0x0c, 0xa5, 0x91, 0x3a, 0xbe, 0xd0, 0xcb, 0xb1, 0x7a, 0x3d, 0x56, 0x37, 0x92, + 0x03, 0xad, 0x49, 0xf8, 0x53, 0xe8, 0xee, 0xd8, 0xcf, 0x7b, 0x96, 0x17, 0x4e, 0x34, 0x68, 0x0e, + 0xa5, 0x51, 0x8b, 0x1e, 0x01, 0x7c, 0x0d, 0x10, 0xe7, 0x94, 0xe5, 0x59, 0x9a, 0xe4, 0x6c, 0x20, + 0x0f, 0xa5, 0x51, 0x87, 0xbe, 0x40, 0xb4, 0xbf, 0x1a, 0xa0, 0xbe, 0x58, 0x05, 0x77, 0x40, 0x9e, + 0x3a, 0xde, 0x3d, 0x3a, 0xe1, 0x91, 0xf9, 0x8d, 0x11, 0x20, 0x09, 0x03, 0xb4, 0x6d, 0xdf, 0x75, + 0xfd, 0xef, 0x51, 0x03, 0xf7, 0xa0, 0x33, 0xf7, 0xaa, 0xac, 0x89, 0xbb, 0xd0, 0xf2, 0xa9, 0x45, + 0x28, 0x92, 0x31, 0x82, 0x9e, 0x08, 0x17, 0x94, 0x7c, 0x4b, 0xcc, 0x00, 0xb5, 0x8e, 0x88, 0x69, + 0x78, 0x26, 0x71, 0x51, 0x1b, 0x5f, 0x02, 0xae, 0x10, 0xdf, 0xb3, 0x1d, 0x3a, 0x31, 0x02, 0xc7, + 0xf7, 0x90, 0x82, 0x3f, 0x86, 0xf3, 0x12, 0xb7, 0xe7, 0xae, 0xed, 0xb8, 0xee, 0x84, 0x78, 0x01, + 0xea, 0xe0, 0x0b, 0x40, 0x35, 0x7d, 0x32, 0x75, 0x89, 0x20, 0x77, 0x79, 0x5b, 0xcb, 0x99, 0x4d, + 0xe7, 0x01, 0x59, 0xf8, 0x53, 0xe2, 0x21, 0xc0, 0x18, 0xfa, 0x35, 0x32, 0x9f, 0x5a, 0x46, 0x40, + 0x90, 0x8a, 0xcf, 0xe1, 0xb4, 0xc6, 0x4c, 0xd7, 0x9f, 0x11, 0xd4, 0xe3, 0x9f, 0x41, 0x89, 0x3d, + 0xf7, 0x2c, 0x74, 0x8a, 0xcf, 0x40, 0xf5, 0x6d, 0xdb, 0x75, 0x3c, 0xb2, 0x30, 0xcc, 0x07, 0xd4, + 0xe7, 0xfc, 0x1a, 0xa0, 0xc4, 0x35, 0x1e, 0xd1, 0x19, 0x87, 0x26, 0xbe, 0x45, 0xa8, 0x11, 0xf8, + 0x74, 0x61, 0x58, 0x16, 0x42, 0x7c, 0xa3, 0x23, 0x44, 0xc9, 0xc4, 0xff, 0x8e, 0xa0, 0x73, 0xae, + 0xc2, 0x2c, 0xf0, 0x29, 0x41, 0x98, 0x87, 0x77, 0xae, 0x6f, 0x3e, 0xa0, 0x8f, 0x30, 0x40, 0x8b, + 0x50, 0xea, 0x53, 0xf4, 0x77, 0x53, 0x8b, 0xa0, 0x43, 0x92, 0x5f, 0xd8, 0x26, 0xcd, 0x18, 0xd6, + 0x40, 0xa9, 0xce, 0x2d, 0x3c, 0xa1, 0x8e, 0x3b, 0xb5, 0x17, 0x68, 0x5d, 0xc0, 0x97, 0xd0, 0xce, + 0xf6, 0xcb, 0x67, 0x76, 0x10, 0x16, 0xe8, 0xd1, 0x2a, 0xe3, 0xb7, 0xce, 0xe3, 0x75, 0x12, 0x16, + 0xfb, 0x1d, 0x13, 0xb7, 0xee, 0xd1, 0x23, 0xa0, 0xfd, 0x29, 0x81, 0x6c, 0xfe, 0x18, 0x16, 0x9c, + 0x56, 0x75, 0x72, 0x22, 0x31, 0xa4, 0x4b, 0x8f, 0x00, 0x1e, 0x80, 0x92, 0xef, 0x97, 0x3f, 0xb1, + 0x55, 0x21, 0xba, 0x77, 0x69, 0x9d, 0xf2, 0x4a, 0xbd, 0x5a, 0xb3, 0xac, 0xd4, 0x0b, 0x7d, 0x0d, + 0xdd, 0xff, 0xbc, 0x2e, 0x5c, 0xa4, 0x8e, 0xaf, 0xfe, 0x67, 0xcb, 0xa0, 0x66, 0xd0, 0x23, 0x19, + 0x5f, 0x83, 0xfc, 0xb4, 0x09, 0xd7, 0x83, 0x96, 0xf0, 0x3f, 0xe8, 0x7c, 0x41, 0xdd, 0xde, 0x84, + 0x6b, 0x2a, 0x70, 0xed, 0x4b, 0x90, 0x79, 0x86, 0x55, 0x50, 0x26, 0x64, 0x36, 0x33, 0xee, 0x09, + 0x3a, 0xe1, 0xa7, 0x0a, 0x1e, 0x85, 0x0f, 0x25, 0xee, 0x43, 0x4a, 0x0c, 0x0b, 0x35, 0xb4, 0x7f, + 0x24, 0x80, 0x59, 0xbc, 0x4e, 0x58, 0x64, 0x85, 0x45, 0x88, 0x35, 0xe8, 0xe5, 0x2c, 0x89, 0xd8, + 0x6e, 0x5a, 0x4a, 0x25, 0x09, 0x3d, 0x5e, 0x61, 0xf8, 0x0b, 0xe8, 0xe7, 0x6c, 0x17, 0x87, 0x9b, + 0xf8, 0xb7, 0xf2, 0x5f, 0x95, 0xa0, 0x6f, 0xd0, 0xf7, 0x85, 0xbd, 0xfa, 0x5d, 0x02, 0xc5, 0x4c, + 0xb7, 0xdb, 0x30, 0x89, 0xc4, 0x69, 0x18, 0xdb, 0x39, 0x56, 0x25, 0x6c, 0x95, 0xe1, 0x11, 0xc8, + 0x05, 0x7f, 0xe7, 0x8d, 0x77, 0xde, 0xb9, 0x60, 0xbc, 0xd6, 0xb2, 0xf9, 0x01, 0x5a, 0x6a, 0x9f, + 0x81, 0x62, 0xc6, 0x91, 0x1b, 0xe7, 0x05, 0xc6, 0x20, 0xaf, 0xe2, 0x28, 0x1f, 0x48, 0xc3, 0xe6, + 0xa8, 0x4b, 0x45, 0xac, 0xdd, 0x42, 0xeb, 0x6e, 0x93, 0xae, 0x9e, 0xf9, 0x1d, 0x77, 0xe1, 0xaf, + 0xe2, 0x73, 0x4b, 0x51, 0xea, 0x14, 0x23, 0x68, 0xae, 0xe2, 0xa8, 0xba, 0x3b, 0x0f, 0xef, 0xe4, + 0x1f, 0x1a, 0xd9, 0x72, 0xd9, 0x16, 0x83, 0x6f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x3d, 0xc7, + 0x6a, 0x70, 0x0c, 0x05, 0x00, 0x00, } diff --git a/pb/protos/message.proto b/pb/protos/message.proto index 599632bd97..136d056ad3 100644 --- a/pb/protos/message.proto +++ b/pb/protos/message.proto @@ -30,6 +30,8 @@ message Message { OFFLINE_RELAY = 15; MODERATOR_ADD = 16; MODERATOR_REMOVE = 17; + STORE = 18; + BLOCK = 19; ERROR = 500; } } @@ -64,4 +66,13 @@ message SignedData { Message.MessageType type = 2; google.protobuf.Timestamp timestamp = 3; } +} + +message CidList { + repeated string cids = 1; +} + +message Block { + bytes rawData = 1; + string cid = 2; } \ No newline at end of file diff --git a/repo/config.go b/repo/config.go index c1c72b55d6..ef46e97563 100644 --- a/repo/config.go +++ b/repo/config.go @@ -16,6 +16,12 @@ var DefaultBootstrapAddresses = []string{ var TestnetBootstrapAddresses = []string{ "/ip4/165.227.117.91/tcp/4001/ipfs/Qmaa6De5QYNqShzPb9SGSo8vLmoUte8mnWgzn4GYwzuUYA", // Brooklyn Flea + "/ip4/46.101.221.165/tcp/4001/ipfs/QmVAQYg7ygAWTWegs8HSV2kdW1MqW8WMrmpqKG1PQtkgTC", // Shipshewana +} + +var DataPushNodes = []string{ + "QmY8puEnVx66uEet64gAf4VZRo7oUyMCwG6KdB9KM92EGQ", + "QmPPg2qeF3n2KvTRXRZLaTwHCw8JxzF4uZK93RfMoDvf2o", } type APIConfig struct { @@ -54,6 +60,11 @@ type WalletConfig struct { RPCPassword string } +type DataSharing struct { + AcceptStoreRequests bool + PushTo []string +} + var MalformedConfigError error = errors.New("Config file is malformed") func GetAPIConfig(cfgBytes []byte) (*APIConfig, error) { @@ -357,34 +368,52 @@ func GetDropboxApiToken(cfgBytes []byte) (string, error) { return tokenStr, nil } -func GetCrosspostGateway(cfgBytes []byte) ([]string, error) { +func GetDataSharing(cfgBytes []byte) (*DataSharing, error) { var cfgIface interface{} json.Unmarshal(cfgBytes, &cfgIface) - var urls []string + dataSharing := new(DataSharing) cfg, ok := cfgIface.(map[string]interface{}) if !ok { - return urls, MalformedConfigError + return dataSharing, MalformedConfigError } - gwys, ok := cfg["Crosspost-gateways"] + dscfg, ok := cfg["DataSharing"] if !ok { - return urls, MalformedConfigError + return dataSharing, MalformedConfigError } - gatewayList, ok := gwys.([]interface{}) + ds, ok := dscfg.(map[string]interface{}) if !ok { - return urls, MalformedConfigError + return dataSharing, MalformedConfigError } - for _, gw := range gatewayList { - gwStr, ok := gw.(string) + acceptcfg, ok := ds["AcceptStoreRequests"] + if !ok { + return dataSharing, MalformedConfigError + } + accept, ok := acceptcfg.(bool) + if !ok { + return dataSharing, MalformedConfigError + } + dataSharing.AcceptStoreRequests = accept + + pushcfg, ok := ds["PushTo"] + if !ok { + return dataSharing, MalformedConfigError + } + pushList, ok := pushcfg.([]interface{}) + if !ok { + return dataSharing, MalformedConfigError + } + + for _, nd := range pushList { + ndStr, ok := nd.(string) if !ok { - return urls, MalformedConfigError + return dataSharing, MalformedConfigError } - urls = append(urls, gwStr) + dataSharing.PushTo = append(dataSharing.PushTo, ndStr) } - - return urls, nil + return dataSharing, nil } func GetTestnetBootstrapAddrs(cfgBytes []byte) ([]string, error) { diff --git a/repo/init.go b/repo/init.go index 66f2aacd5a..fbe15e8d9b 100644 --- a/repo/init.go +++ b/repo/init.go @@ -16,7 +16,7 @@ import ( "time" ) -const RepoVersion = "2" +const RepoVersion = "3" var log = logging.MustGetLogger("repo") var ErrRepoExists = errors.New("IPFS configuration file exists. Reinitializing would overwrite your keys. Use -f to force overwrite.") @@ -217,6 +217,11 @@ func addConfigExtensions(repoRoot string, testnet bool) error { HTTPHeaders: nil, } + var ds DataSharing = DataSharing{ + AcceptStoreRequests: false, + PushTo: DataPushNodes, + } + var t TorConfig = TorConfig{} if err := extendConfigFile(r, "Wallet", w); err != nil { return err @@ -224,13 +229,13 @@ func addConfigExtensions(repoRoot string, testnet bool) error { var resolvers ResolverConfig = ResolverConfig{ Id: "https://resolver.onename.com/", } - if err := extendConfigFile(r, "Resolvers", resolvers); err != nil { + if err := extendConfigFile(r, "DataSharing", ds); err != nil { return err } - if err := extendConfigFile(r, "Bootstrap-testnet", TestnetBootstrapAddresses); err != nil { + if err := extendConfigFile(r, "Resolvers", resolvers); err != nil { return err } - if err := extendConfigFile(r, "Crosspost-gateways", []string{"https://gateway.ob1.io/", "https://gateway.duosear.ch/"}); err != nil { + if err := extendConfigFile(r, "Bootstrap-testnet", TestnetBootstrapAddresses); err != nil { return err } if err := extendConfigFile(r, "Dropbox-api-token", ""); err != nil { diff --git a/repo/migration.go b/repo/migration.go index 0a9ff34826..099784e921 100644 --- a/repo/migration.go +++ b/repo/migration.go @@ -16,6 +16,7 @@ type Migration interface { var Migrations = []Migration{ migrations.Migration000, migrations.Migration001, + migrations.Migration002, } // MigrateUp looks at the currently active migration version @@ -31,12 +32,14 @@ func MigrateUp(repoPath string) error { if err != nil { return err } + x := v for _, m := range Migrations[v:] { - log.Noticef("Migrationg repo to version %d\n", v+1) + log.Noticef("Migrationg repo to version %d\n", x+1) err := m.Up(repoPath) if err != nil { return err } + x++ } return nil } diff --git a/repo/migrations/Migration001_test.go b/repo/migrations/Migration001_test.go index 2d8b0b8909..5f09fcdac1 100644 --- a/repo/migrations/Migration001_test.go +++ b/repo/migrations/Migration001_test.go @@ -1,7 +1,6 @@ package migrations import ( - "fmt" "io/ioutil" "os" "strings" diff --git a/repo/migrations/Migration002.go b/repo/migrations/Migration002.go new file mode 100644 index 0000000000..bd36bcc7bb --- /dev/null +++ b/repo/migrations/Migration002.go @@ -0,0 +1,104 @@ +package migrations + +import ( + "encoding/json" + "errors" + "io/ioutil" + "os" + "path" +) + +var Migration002 migration002 + +type migration002 struct{} + +func (migration002) Up(repoPath string) error { + configFile, err := ioutil.ReadFile(path.Join(repoPath, "config")) + if err != nil { + return err + } + var cfgIface interface{} + json.Unmarshal(configFile, &cfgIface) + cfg, ok := cfgIface.(map[string]interface{}) + if !ok { + return errors.New("Invalid config file") + } + + pushNodes := []string{ + "QmY8puEnVx66uEet64gAf4VZRo7oUyMCwG6KdB9KM92EGQ", + "QmPPg2qeF3n2KvTRXRZLaTwHCw8JxzF4uZK93RfMoDvf2o", + } + cfg["DataSharing"] = map[string]interface{}{ + "AcceptStoreRequests": false, + "PushTo": pushNodes, + } + + delete(cfg, "Crosspost-gateways") + + out, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return err + } + f, err := os.Create(path.Join(repoPath, "config")) + if err != nil { + return err + } + _, err = f.Write(out) + if err != nil { + return err + } + f.Close() + + f1, err := os.Create(path.Join(repoPath, "repover")) + if err != nil { + return err + } + _, err = f1.Write([]byte("3")) + if err != nil { + return err + } + f1.Close() + return nil +} + +func (migration002) Down(repoPath string) error { + configFile, err := ioutil.ReadFile(path.Join(repoPath, "config")) + if err != nil { + return err + } + var cfgIface interface{} + json.Unmarshal(configFile, &cfgIface) + cfg, ok := cfgIface.(map[string]interface{}) + if !ok { + return errors.New("Invalid config file") + } + + cfg["Crosspost-gateways"] = []string{"https://gateway.ob1.io/", "https://gateway.duosear.ch/"} + + delete(cfg, "DataSharing") + + out, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return err + } + f, err := os.Create(path.Join(repoPath, "config")) + if err != nil { + return err + } + _, err = f.Write(out) + if err != nil { + return err + } + f.Close() + + f1, err := os.Create(path.Join(repoPath, "repover")) + if err != nil { + return err + } + _, err = f1.Write([]byte("2")) + if err != nil { + return err + } + f1.Close() + return nil +} diff --git a/repo/migrations/Migration002_test.go b/repo/migrations/Migration002_test.go new file mode 100644 index 0000000000..90ef7f0f44 --- /dev/null +++ b/repo/migrations/Migration002_test.go @@ -0,0 +1,74 @@ +package migrations + +import ( + "io/ioutil" + "os" + "strings" + "testing" +) + +var testConfig2 string = `{ + "Crosspost-gateways": [ + "https://gateway.ob1.io/", + "https://gateway.duosear.ch/" + ] +}` + +func TestMigration002(t *testing.T) { + f, err := os.Create("./config") + if err != nil { + t.Error(err) + } + f.Write([]byte(testConfig2)) + f.Close() + var m migration002 + + // Up + err = m.Up("./") + if err != nil { + t.Error(err) + } + newConfig, err := ioutil.ReadFile("./config") + if err != nil { + t.Error(err) + } + if !strings.Contains(string(newConfig), `DataSharing`) { + t.Error("Failed to write new DataSharing object") + } + if strings.Contains(string(newConfig), `Crosspost-gateways`) { + t.Error("Failed to delete Crosspost-gateways") + } + repoVer, err := ioutil.ReadFile("./repover") + if err != nil { + t.Error(err) + } + if string(repoVer) != "3" { + t.Error("Failed to write new repo version") + } + + // Down + err = m.Down("./") + if err != nil { + t.Error(err) + } + newConfig, err = ioutil.ReadFile("./config") + if err != nil { + t.Error(err) + } + if !strings.Contains(string(newConfig), `Crosspost-gateways`) { + t.Error("Failed to write new Crosspost-gateways") + } + if strings.Contains(string(newConfig), `DataSharing`) { + t.Errorf("Failed to delete DataSharing") + } + repoVer, err = ioutil.ReadFile("./repover") + if err != nil { + t.Error(err) + } + if string(repoVer) != "2" { + t.Error("Failed to write new repo version") + } + + os.Remove("./config") + os.Remove("./repover") +} diff --git a/repo/testdata/config b/repo/testdata/config index 9145f8d29d..1b68d8b0cc 100644 --- a/repo/testdata/config +++ b/repo/testdata/config @@ -19,9 +19,13 @@ "/ip4/139.59.174.197/tcp/4001/ipfs/QmZbLxbrPfGKjhFPwv9g7PkT5jL5DzQ8mF3iioByWMAprj", "/ip4/139.59.6.222/tcp/4001/ipfs/QmPZkv392E7VxumGSugQDEpfk6bHxfv271HTdVvdUu5Sod" ], - "Crosspost-gateways": [ - "http://gateway.ob1.io/" - ], + "DataSharing": { + "AcceptStoreRequests": false, + "PushTo": [ + "QmZbLxbrPfGKjhFPwv9g7PkT5jL5DzQ8mF3iioByWMAprj", + "QmPZkv392E7VxumGSugQDEpfk6bHxfv271HTdVvdUu5Sod" + ] + }, "Datastore": { "BloomFilterSize": 0, "GCPeriod": "1h", diff --git a/repo/testdata/repover b/repo/testdata/repover index 56a6051ca2..e440e5c842 100644 --- a/repo/testdata/repover +++ b/repo/testdata/repover @@ -1 +1 @@ -1 \ No newline at end of file +3 \ No newline at end of file diff --git a/storage/selfhosted/selfhostedstorage.go b/storage/selfhosted/selfhostedstorage.go index 47e170b2e5..cc7107b78c 100644 --- a/storage/selfhosted/selfhostedstorage.go +++ b/storage/selfhosted/selfhostedstorage.go @@ -8,35 +8,24 @@ import ( "os" "path" - "bytes" "github.com/OpenBazaar/openbazaar-go/ipfs" "github.com/ipfs/go-ipfs/commands" - "golang.org/x/net/proxy" - "net" - "net/http" - "net/url" - "time" + "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" ) type SelfHostedStorage struct { - repoPath string - context commands.Context - crossPostGateways []*url.URL - httpClient *http.Client + repoPath string + context commands.Context + pushNodes []peer.ID + store func(peerId string, ids []cid.Cid) error } -func NewSelfHostedStorage(repoPath string, context commands.Context, crossPostGateways []*url.URL, dialer proxy.Dialer) *SelfHostedStorage { - dial := net.Dial - if dialer != nil { - dial = dialer.Dial - } - tbTransport := &http.Transport{Dial: dial} - client := &http.Client{Transport: tbTransport, Timeout: time.Minute} +func NewSelfHostedStorage(repoPath string, context commands.Context, pushNodes []peer.ID, store func(peerId string, ids []cid.Cid) error) *SelfHostedStorage { return &SelfHostedStorage{ - repoPath: repoPath, - context: context, - crossPostGateways: crossPostGateways, - httpClient: client, + repoPath: repoPath, + context: context, + pushNodes: pushNodes, + store: store, } } @@ -57,8 +46,12 @@ func (s *SelfHostedStorage) Store(peerID peer.ID, ciphertext []byte) (ma.Multiad if err != nil { return nil, err } - for _, g := range s.crossPostGateways { - s.httpClient.Post(g.String()+"ipfs/", "application/x-www-form-urlencoded", bytes.NewReader(ciphertext)) + id, err := cid.Decode(addr) + if err != nil { + return nil, err + } + for _, peer := range s.pushNodes { + go s.store(peer.Pretty(), []cid.Cid{*id}) } maAddr, err := ma.NewMultiaddr("/ipfs/" + addr + "/") if err != nil { diff --git a/vendor/github.com/ipfs/go-ipfs/core/bootstrap.go b/vendor/github.com/ipfs/go-ipfs/core/bootstrap.go index 2fc6a1b39e..4d913b7c65 100644 --- a/vendor/github.com/ipfs/go-ipfs/core/bootstrap.go +++ b/vendor/github.com/ipfs/go-ipfs/core/bootstrap.go @@ -51,9 +51,6 @@ type BootstrapConfig struct { // for the bootstrap process to use. This makes it possible for clients // to control the peers the process uses at any moment. BootstrapPeers func() []pstore.PeerInfo - - // Chan to return on when complete - DoneChan chan struct{} } // DefaultBootstrapConfig specifies default sane parameters for bootstrapping. @@ -61,7 +58,6 @@ var DefaultBootstrapConfig = BootstrapConfig{ MinPeerThreshold: 4, Period: 30 * time.Second, ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 - DoneChan: make(chan struct{}), } func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { @@ -72,6 +68,8 @@ func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig { return cfg } +var bootstrapOnce sync.Once + // Bootstrap kicks off IpfsNode bootstrapping. This function will periodically // check the number of open connections and -- if there are too few -- initiate // connections to well-known bootstrap peers. It also kicks off subsystem @@ -80,9 +78,9 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // make a signal to wait for one bootstrap round to complete. //doneWithRound := make(chan struct{}) + ch := make(chan struct{}) // the periodic bootstrap function -- the connection supervisor - var once sync.Once periodic := func(worker goprocess.Process) { ctx := procctx.OnClosingContext(worker) defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() @@ -92,24 +90,21 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { log.Debugf("%s bootstrap error: %s", n.Identity, err) } - once.Do(func() { - close(cfg.DoneChan) - }) - //<-doneWithRound + bootstrapOnce.Do(func() { close(ch) }) } // kick off the node's periodic bootstrapping proc := periodicproc.Tick(cfg.Period, periodic) proc.Go(periodic) // run one right now. - // kick off Routing.Bootstrap - if n.Routing != nil { - ctx := procctx.OnClosingContext(proc) - if err := n.Routing.Bootstrap(ctx); err != nil { - proc.Close() - return nil, err + go func() { + <-ch + // kick off Routing.Bootstrap + if n.Routing != nil { + ctx := procctx.OnClosingContext(proc) + n.Routing.Bootstrap(ctx) } - } + }() //doneWithRound <- struct{}{} //close(doneWithRound) // it no longer blocks periodic diff --git a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_bootstrap.go b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_bootstrap.go index 1fdee68ed4..709a4a61da 100644 --- a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_bootstrap.go +++ b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_bootstrap.go @@ -22,9 +22,10 @@ import ( // number of queries. We could support a higher period with less // queries. type BootstrapConfig struct { - Queries int // how many queries to run per period - Period time.Duration // how often to run periodi cbootstrap. - Timeout time.Duration // how long to wait for a bootstrao query to run + Queries int // how many queries to run per period + Period time.Duration // how often to run periodi cbootstrap. + Timeout time.Duration // how long to wait for a bootstrao query to run + DoneChan chan struct{} } var DefaultBootstrapConfig = BootstrapConfig{ @@ -40,8 +41,12 @@ var DefaultBootstrapConfig = BootstrapConfig{ Period: time.Duration(5 * time.Minute), Timeout: time.Duration(10 * time.Second), + + DoneChan: make(chan struct{}), } +var bootstrapOnce sync.Once + // Bootstrap ensures the dht routing table remains healthy as peers come and go. // it builds up a list of peers by requesting random peer IDs. The Bootstrap // process will run a number of queries each time, and run every time signal fires. @@ -78,6 +83,11 @@ func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + defer bootstrapOnce.Do(func() { close(DefaultBootstrapConfig.DoneChan) }) + + dht.runBootstrap(ctx, cfg) proc := periodicproc.Tick(cfg.Period, dht.bootstrapWorker(cfg)) return proc, nil @@ -107,7 +117,6 @@ func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.P return func(worker goprocess.Process) { // it would be useful to be able to send out signals of when we bootstrap, too... // maybe this is a good case for whole module event pub/sub? - ctx := dht.Context() if err := dht.runBootstrap(ctx, cfg); err != nil { log.Warning(err) diff --git a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_net.go b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_net.go index 6e81a2afec..72ce16ed21 100644 --- a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_net.go +++ b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/dht_net.go @@ -74,6 +74,10 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { } } +func (dht *IpfsDHT) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + return dht.sendRequest(ctx, p, pmes) +} + // sendRequest sends out a request, but also makes sure to // measure the RTT for latency measurements. func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { @@ -98,6 +102,10 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message return rpmes, nil } +func (dht *IpfsDHT) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { + return dht.sendMessage(ctx, p, pmes) +} + // sendMessage sends out a message func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { ms, err := dht.messageSenderForPeer(p) diff --git a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/routing.go b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/routing.go index 38e0cbdd6f..51f55f87d6 100644 --- a/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/routing.go +++ b/vendor/gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/routing.go @@ -276,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) erro wg.Wait() return nil } + func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) { pi := pstore.PeerInfo{ ID: dht.self,