Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions core/core.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package core

import (
"bytes"
"errors"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
libp2p "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
gonet "net"
"net/http"
"net/url"
"path"
"time"

Expand All @@ -27,6 +23,7 @@ import (
"github.com/op/go-logging"
"golang.org/x/net/context"
"golang.org/x/net/proxy"
"gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
"sync"
)

Expand Down Expand Up @@ -83,8 +80,8 @@ type OpenBazaarNode struct {
// A service that periodically fetches and caches the bitcoin exchange rates
ExchangeRates bitcoin.ExchangeRates

// An optional gateway URL where we can crosspost data to ensure persistence
CrosspostGateways []*url.URL
// Optional nodes to push user data to
PushNodes []peer.ID

// The user-agent for this node
UserAgent string
Expand All @@ -94,6 +91,9 @@ type OpenBazaarNode struct {

// Manage blocked peers
BanManager *net.BanManager

// Allow other nodes to push data to this node for storage
AcceptStoreRequests bool
}

// Unpin the current node repo, re-add it, then publish to IPNS
Expand All @@ -118,22 +118,28 @@ func (n *OpenBazaarNode) SeedNode() error {
return aerr
}
seedLock.Unlock()
id, err := cid.Decode(rootHash)
if err != nil {
log.Error(err)
return err
}

for _, g := range n.CrosspostGateways {
go func(u *url.URL) {
req, err := http.NewRequest("PUT", u.String()+path.Join("ipfs", rootHash), new(bytes.Buffer))
var graph []cid.Cid
if len(n.PushNodes) > 0 {
graph, err = ipfs.FetchGraph(n.IpfsNode.DAG, id)
if err != nil {
return err
}
}
for _, p := range n.PushNodes {
go func(pid peer.ID) {
err := n.SendStore(pid.Pretty(), graph)
if err != nil {
return
}
dial := gonet.Dial
if n.TorDialer != nil {
dial = n.TorDialer.Dial
log.Errorf("Error pushing data to peer %s: %s", pid.Pretty(), err.Error())
}
tbTransport := &http.Transport{Dial: dial}
client := &http.Client{Transport: tbTransport, Timeout: time.Minute}
client.Do(req)
}(g)
}(p)
}

go n.publish(rootHash)
return nil
}
Expand All @@ -145,6 +151,7 @@ func (n *OpenBazaarNode) publish(hash string) {
var err error
inflightPublishRequests++
_, err = ipfs.Publish(n.Context, hash)

inflightPublishRequests--
if inflightPublishRequests == 0 {
if err != nil {
Expand Down
120 changes: 95 additions & 25 deletions core/net.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
package core

import (
ps "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
multihash "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
libp2p "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"

"bytes"
"errors"
"github.com/OpenBazaar/openbazaar-go/ipfs"
"github.com/OpenBazaar/openbazaar-go/pb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"golang.org/x/net/context"
dhtpb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb"
gonet "net"
"net/http"
"net/url"
"gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
"sync"
"time"
)
Expand Down Expand Up @@ -99,28 +95,19 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
if err != nil {
log.Error(err)
}
OfflineMessageWaitGroup.Done()
}()

// Post provider to gateway if we have one set in the config
if len(n.CrosspostGateways) > 0 {
dial := gonet.Dial
if n.TorDialer != nil {
dial = n.TorDialer.Dial
}
tbTransport := &http.Transport{Dial: dial}
client := &http.Client{Transport: tbTransport, Timeout: time.Minute}
pmes := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, pointer.Cid.KeyString(), 0)
pmes.ProviderPeers = dhtpb.RawPeerInfosToPBPeers([]ps.PeerInfo{pointer.Value})
ser, err := proto.Marshal(pmes)
if err == nil {
for _, g := range n.CrosspostGateways {
go func(u *url.URL) {
client.Post(u.String()+"ipfs/providers", "application/x-www-form-urlencoded", bytes.NewReader(ser))
}(g)
// Push provider to our push nodes for redundancy
for _, p := range n.PushNodes {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := ipfs.PutPointerToPeer(n.IpfsNode, ctx, p, pointer)
if err != nil {
log.Error(err)
}
}
}

OfflineMessageWaitGroup.Done()
}()
return nil
}

Expand Down Expand Up @@ -524,3 +511,86 @@ func (n *OpenBazaarNode) SendModeratorRemove(peerId string) error {
}
return nil
}

func (n *OpenBazaarNode) SendBlock(peerId string, id cid.Cid) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
block, err := n.IpfsNode.Blocks.GetBlock(ctx, &id)
if err != nil {
return err
}

b := &pb.Block{
Cid: block.Cid().String(),
RawData: block.RawData(),
}
a, err := ptypes.MarshalAny(b)
if err != nil {
return err
}
m := pb.Message{
MessageType: pb.Message_BLOCK,
Payload: a,
}

p, err := peer.IDB58Decode(peerId)
if err != nil {
return err
}
return n.Service.SendMessage(context.Background(), p, &m)
}

func (n *OpenBazaarNode) SendStore(peerId string, ids []cid.Cid) error {
var s []string
for _, d := range ids {
s = append(s, d.String())
}
cList := new(pb.CidList)
cList.Cids = s

a, err := ptypes.MarshalAny(cList)
if err != nil {
return err
}

m := pb.Message{
MessageType: pb.Message_STORE,
Payload: a,
}

p, err := peer.IDB58Decode(peerId)
if err != nil {
return err
}
pmes, err := n.Service.SendRequest(context.Background(), p, &m)
if err != nil {
return err
}
defer n.Service.DisconnectFromPeer(p)
if pmes.Payload == nil {
return errors.New("Peer responded with nil payload")
}
if pmes.MessageType == pb.Message_ERROR {
log.Errorf("Error response from %s: %s", peerId, string(pmes.Payload.Value))
return errors.New("Peer responded with error message")
}

resp := new(pb.CidList)
err = ptypes.UnmarshalAny(pmes.Payload, resp)
if err != nil {
return err
}
if len(resp.Cids) == 0 {
log.Debugf("Peer %s requested no blocks", peerId)
return nil
}
log.Debugf("Sending %d blocks to %s", len(resp.Cids), peerId)
for _, id := range resp.Cids {
decoded, err := cid.Decode(id)
if err != nil {
continue
}
n.SendBlock(peerId, *decoded)
}
return nil
}
43 changes: 19 additions & 24 deletions ipfs/pointers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@ import (
multihash "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
host "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"

"github.com/ipfs/go-ipfs/core"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
"time"

routing "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht"
dhtpb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb"
pb "gx/ipfs/Qmcjua7379qzY63PJ5a8w3mDteHZppiX2zo6vFeaqjVcQi/go-libp2p-kad-dht/pb"

ctxio "github.com/jbenet/go-context/io"
)

const MAGIC string = "000000000000000000000000"
Expand Down Expand Up @@ -91,9 +88,23 @@ func FindPointers(dht *routing.IpfsDHT, ctx context.Context, mhKey multihash.Mul
return providers, nil
}

func PutPointerToPeer(node *core.IpfsNode, ctx context.Context, peer peer.ID, pointer Pointer) error {
dht := node.Routing.(*routing.IpfsDHT)
return putPointer(ctx, dht, peer, pointer.Value, pointer.Cid.KeyString())
}

func GetPointersFromPeer(node *core.IpfsNode, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) {
dht := node.Routing.(*routing.IpfsDHT)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
resp, err := dht.SendRequest(ctx, p, pmes)
if err != nil {
return []*ps.PeerInfo{}, err
}
return dhtpb.PBPeersToPeerInfos(resp.GetProviderPeers()), nil
}

func addPointer(node *core.IpfsNode, ctx context.Context, k *cid.Cid, pi ps.PeerInfo) error {
dht := node.Routing.(*routing.IpfsDHT)
peerHosts := node.PeerHost
peers, err := dht.GetClosestPeers(ctx, k.KeyString())
if err != nil {
return err
Expand All @@ -103,40 +114,24 @@ func addPointer(node *core.IpfsNode, ctx context.Context, k *cid.Cid, pi ps.Peer
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
putPointer(ctx, peerHosts.(host.Host), p, pi, k.KeyString())
putPointer(ctx, dht, p, pi, k.KeyString())
}(p)
}
wg.Wait()
return nil
}

func putPointer(ctx context.Context, peerHosts host.Host, p peer.ID, pi ps.PeerInfo, skey string) error {
func putPointer(ctx context.Context, dht *routing.IpfsDHT, p peer.ID, pi ps.PeerInfo, skey string) error {
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey, 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]ps.PeerInfo{pi})

err := sendMessage(ctx, peerHosts, p, pmes)
err := dht.SendMessage(ctx, p, pmes)
if err != nil {
return err
}
return nil
}

func sendMessage(ctx context.Context, host host.Host, p peer.ID, pmes *pb.Message) error {
s, err := host.NewStream(ctx, p, routing.ProtocolDHT)
if err != nil {
return err
}
defer s.Close()

cw := ctxio.NewWriter(ctx, s)
w := ggio.NewDelimitedWriter(cw)

if err := w.WriteMsg(pmes); err != nil {
return err
}
return nil
}

func CreatePointerKey(mh multihash.Multihash, prefixLen int) multihash.Multihash {
// Grab the first 8 bytes from the multihash digest
m, _ := multihash.Decode(mh)
Expand Down
3 changes: 3 additions & 0 deletions net/networkservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ type NetworkService interface {

// Send a message to a peer without requiring a response
SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error

// Disconnect from the given peer
DisconnectFromPeer(p peer.ID) error
}
9 changes: 8 additions & 1 deletion net/repointer/repointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-ipfs/core"
"github.com/op/go-logging"
"golang.org/x/net/context"
"gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
)

var log = logging.MustGetLogger("service")
Expand All @@ -18,13 +19,15 @@ const kPointerExpiration = time.Hour * 24 * 30
type PointerRepublisher struct {
ipfsNode *core.IpfsNode
db repo.Datastore
pushNodes []peer.ID
isModerator func() bool
}

func NewPointerRepublisher(node *core.IpfsNode, database repo.Datastore, isModerator func() bool) *PointerRepublisher {
func NewPointerRepublisher(node *core.IpfsNode, database repo.Datastore, pushNodes []peer.ID, isModerator func() bool) *PointerRepublisher {
return &PointerRepublisher{
ipfsNode: node,
db: database,
pushNodes: pushNodes,
isModerator: isModerator,
}
}
Expand All @@ -46,13 +49,17 @@ func (r *PointerRepublisher) Republish() {
return
}
ctx := context.Background()

for _, p := range pointers {
switch p.Purpose {
case ipfs.MESSAGE:
if time.Now().Sub(p.Timestamp) > kPointerExpiration {
r.db.Pointers().Delete(p.Value.ID)
} else {
go ipfs.PublishPointer(r.ipfsNode, ctx, p)
for _, peer := range r.pushNodes {
go ipfs.PutPointerToPeer(r.ipfsNode, context.Background(), peer, p)
}
}
case ipfs.MODERATOR:
if republishModerator {
Expand Down
Loading