Skip to content

Commit

Permalink
test: add peer http server, minor refactors and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kylehuntsman committed May 11, 2023
1 parent 3fbcb2c commit 5331ef7
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 50 deletions.
14 changes: 14 additions & 0 deletions pkg/internal/itest/http_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func TestHttpFetch(t *testing.T) {
return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, false)}
},
},
{
name: "http large directory",
httpRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, false)}
},
},
{
name: "graphsync large sharded directory",
graphsyncRemotes: 1,
Expand All @@ -105,6 +112,13 @@ func TestHttpFetch(t *testing.T) {
return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, true)}
},
},
{
name: "http large sharded directory",
httpRemotes: 1,
generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry {
return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, true)}
},
},
{
name: "graphsync max block limit",
graphsyncRemotes: 1,
Expand Down
35 changes: 16 additions & 19 deletions pkg/internal/itest/mocknet/mocknet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
lpmock "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -62,26 +63,19 @@ func NewMockRetrievalNet(ctx context.Context, t *testing.T) *MockRetrievalNet {
}

func (mrn *MockRetrievalNet) AddBitswapPeers(n int) {
peers := mrn.testPeerGenerator.BitswapPeers(n)
for i := 0; i < n; i++ {
mrn.Remotes = append(mrn.Remotes, peers[i])
mrn.RemoteEvents = append(mrn.RemoteEvents, make([]datatransfer.Event, 0)) // not used for bitswap
mrn.FinishedChan = append(mrn.FinishedChan, make(chan struct{}, 1)) // not used for bitswap
}
mrn.addPeers(mrn.testPeerGenerator.BitswapPeers(n))
}

func (mrn *MockRetrievalNet) AddGraphsyncPeers(n int) {
peers := mrn.testPeerGenerator.GraphsyncPeers(n)
for i := 0; i < n; i++ {
mrn.Remotes = append(mrn.Remotes, peers[i])
mrn.RemoteEvents = append(mrn.RemoteEvents, make([]datatransfer.Event, 0))
mrn.FinishedChan = append(mrn.FinishedChan, make(chan struct{}, 1))
}
mrn.addPeers(mrn.testPeerGenerator.GraphsyncPeers(n))
}

func (mrn *MockRetrievalNet) AddHttpPeers(n int) {
peers := mrn.testPeerGenerator.HttpPeers(n)
for i := 0; i < n; i++ {
mrn.addPeers(mrn.testPeerGenerator.HttpPeers(n))
}

func (mrn *MockRetrievalNet) addPeers(peers []testpeer.TestPeer) {
for i := 0; i < len(peers); i++ {
mrn.Remotes = append(mrn.Remotes, peers[i])
mrn.RemoteEvents = append(mrn.RemoteEvents, make([]datatransfer.Event, 0))
mrn.FinishedChan = append(mrn.FinishedChan, make(chan struct{}, 1))
Expand Down Expand Up @@ -139,7 +133,9 @@ func (mrn *MockRetrievalNet) TearDown() error {
if h.BitswapNetwork != nil {
h.BitswapNetwork.Stop()
}
// TODO: teardown http server
if h.HttpServer != nil {
h.HttpServer.Close()
}
}(h)
}
wg.Wait()
Expand All @@ -155,12 +151,13 @@ func (mcf *mockCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid)
for _, h := range mcf.mrn.Remotes {
if _, has := h.Cids[cid]; has {
var md metadata.Metadata
if h.Bitswap {
switch h.Protocol {
case multicodec.TransportBitswap:
md = metadata.Default.New(metadata.Bitswap{})
} else if h.Http {
md = metadata.Default.New(&metadata.IpfsGatewayHttp{})
} else if h.Graphsync {
case multicodec.TransportGraphsyncFilecoinv1:
md = metadata.Default.New(&metadata.GraphsyncFilecoinV1{PieceCID: cid})
case multicodec.TransportIpfsGatewayHttp:
md = metadata.Default.New(&metadata.IpfsGatewayHttp{})
}
candidates = append(candidates, types.RetrievalCandidate{MinerPeer: *h.AddrInfo(), RootCid: cid, Metadata: md})
}
Expand Down
86 changes: 55 additions & 31 deletions pkg/internal/itest/testpeer/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/ipfs/go-unixfsnode"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -45,6 +45,7 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -162,17 +163,13 @@ type TestPeer struct {
BitswapServer *server.Server
BitswapNetwork bsnet.BitSwapNetwork
DatatransferServer datatransfer.Manager
ProtocolHttp *retriever.ProtocolHttp
HttpServer *TestPeerHttpServer
blockstore blockstore.Blockstore
Host host.Host
blockstoreDelay delay.D
LinkSystem linking.LinkSystem
Cids map[cid.Cid]struct{}

// TODO: Replace with enum
Bitswap bool
Graphsync bool
Http bool
Protocol multicodec.Code
}

// Blockstore returns the block store for this test instance
Expand Down Expand Up @@ -212,7 +209,7 @@ func NewTestBitswapPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity
}()
peer.BitswapServer = bs
peer.BitswapNetwork = bsNet
peer.Bitswap = true
peer.Protocol = multicodec.TransportBitswap
return peer, nil
}

Expand All @@ -238,7 +235,7 @@ func NewTestGraphsyncPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identi
}

peer.DatatransferServer = dtRemote
peer.Graphsync = true
peer.Protocol = multicodec.TransportGraphsyncFilecoinv1
return peer, nil
}

Expand All @@ -253,18 +250,27 @@ func NewTestHttpPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, t
peer.Host.Peerstore().AddAddr(p.ID(), httpAddr, 10*time.Minute) // TODO: Look into ttl duration?

go func() {
// Handle /ipfs/ endpoint
http.HandleFunc("/ipfs/", HttpHandler(ctx, peer.LinkSystem))

// Parse multiaddr IP and port, serve http server from address
addrParts := strings.Split(p.Address().String(), "/")
err := http.ListenAndServe(fmt.Sprintf("%s:%s", addrParts[2], addrParts[4]), nil)
peerHttpServer, err := NewTestPeerHttpServer(ctx, addrParts[2], addrParts[4])
if err != nil {
logger.Errorw("failed to make test peer http server", "err", err)
ctx.Done()
}
peer.HttpServer = peerHttpServer

// Handle custom /ipfs/ endpoint
peerHttpServer.Mux.HandleFunc("/ipfs/", MockIpfsHandler(ctx, peer.LinkSystem))

// Start the server
peerHttpServer.Start()
if err != http.ErrServerClosed {
logger.Errorw("failed to start mocknet peer http server", "err", err)
logger.Errorw("failed to start peer http server", "err", err)
ctx.Done()
}
}()

peer.Http = true
peer.Protocol = multicodec.TransportIpfsGatewayHttp
return peer, nil
}

Expand Down Expand Up @@ -330,7 +336,7 @@ func StartAndWaitForReady(ctx context.Context, manager datatransfer.Manager) err
}
}

func HttpHandler(ctx context.Context, lsys linking.LinkSystem) func(http.ResponseWriter, *http.Request) {
func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.ResponseWriter, *http.Request) {
return func(res http.ResponseWriter, req *http.Request) {
urlPath := strings.Split(req.URL.Path, "/")[1:]

Expand All @@ -348,20 +354,27 @@ func HttpHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Respons
unixfsPath = "/" + strings.Join(urlPath[2:], "/")
}

// We're always providing the car-scope parameter, so add a failure case if we stop
// providing it in the future
if !req.URL.Query().Has("car-scope") {
http.Error(res, "Missing car-scope parameter", http.StatusBadRequest)
return
}

// Parse car scope and use it to get selector
carScope := types.CarScopeAll
if req.URL.Query().Has("car-scope") {
switch req.URL.Query().Get("car-scope") {
case "all":
case "file":
carScope = types.CarScopeFile
case "block":
carScope = types.CarScopeBlock
default:
http.Error(res, fmt.Sprintf("Invalid car-scope parameter: %s", req.URL.Query().Get("car-scope")), http.StatusBadRequest)
return
}
var carScope types.CarScope
switch req.URL.Query().Get("car-scope") {
case "all":
carScope = types.CarScopeAll
case "file":
carScope = types.CarScopeFile
case "block":
carScope = types.CarScopeBlock
default:
http.Error(res, fmt.Sprintf("Invalid car-scope parameter: %s", req.URL.Query().Get("car-scope")), http.StatusBadRequest)
return
}

selNode := unixfsnode.UnixFSPathSelectorBuilder(unixfsPath, carScope.TerminalSelectorSpec(), false)
sel, err := selector.CompileSelector(selNode)
if err != nil {
Expand Down Expand Up @@ -395,7 +408,16 @@ func HttpHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Respons
return bytes.NewReader(byts), nil
}

rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any)
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)
lnk := cidlink.Link{Cid: rootCid}
lnkCtx := linking.LinkContext{}
proto, err := protoChooser(lnk, lnkCtx)
if err != nil {
http.Error(res, fmt.Sprintf("Failed to choose prototype node: %s", cidStr), http.StatusBadRequest)
return
}

rootNode, err := lsys.Load(lnkCtx, lnk, proto)
if err != nil {
http.Error(res, fmt.Sprintf("Failed to load root cid into link system: %v", err), http.StatusInternalServerError)
return
Expand All @@ -404,14 +426,16 @@ func HttpHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Respons
cfg := &traversal.Config{
Ctx: ctx,
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
LinkTargetNodePrototypeChooser: protoChooser,
}
progress := traversal.Progress{Cfg: cfg}

err = progress.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil })
err = progress.WalkAdv(rootNode, sel, visitNoop)
if err != nil {
http.Error(res, fmt.Sprintf("Failed to traverse from root node: %v", err), http.StatusInternalServerError)
return
}
}
}

func visitNoop(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil }
67 changes: 67 additions & 0 deletions pkg/internal/itest/testpeer/peerhttpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package testpeer

import (
"context"
"fmt"
"net"
"net/http"

servertiming "github.com/mitchellh/go-server-timing"
)

type TestPeerHttpServer struct {
cancel context.CancelFunc
ctx context.Context
listener net.Listener
server *http.Server
Mux *http.ServeMux
}

// NewTestPeerHttpServer creates a new HttpServer
func NewTestPeerHttpServer(ctx context.Context, host string, port string) (*TestPeerHttpServer, error) {
addr := fmt.Sprintf("%s:%s", host, port)
listener, err := net.Listen("tcp", addr) // assigns a port if port is 0
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)

// create server
mux := http.NewServeMux()
handler := servertiming.Middleware(mux, nil)
server := &http.Server{
Addr: addr,
BaseContext: func(listener net.Listener) context.Context { return ctx },
Handler: handler,
}

httpServer := &TestPeerHttpServer{
cancel: cancel,
ctx: ctx,
listener: listener,
server: server,
Mux: mux,
}

return httpServer, nil
}

// Start starts the http server, returning an error if the server failed to start
func (s *TestPeerHttpServer) Start() error {
logger.Infow("starting test peer http server", "listen_addr", s.listener.Addr())
err := s.server.Serve(s.listener)
if err != http.ErrServerClosed {
logger.Errorw("failed to start test peer http server", "err", err)
return err
}

return nil
}

// Close shutsdown the server and cancels the server context
func (s *TestPeerHttpServer) Close() error {
logger.Info("closing test peer http server")
s.cancel()
return s.server.Shutdown(context.Background())
}

0 comments on commit 5331ef7

Please sign in to comment.