diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index ae421c5e..16f834a2 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -56,6 +56,7 @@ func TestHttpFetch(t *testing.T) { name string graphsyncRemotes int bitswapRemotes int + httpRemotes int disableGraphsync bool expectFail bool modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig @@ -78,6 +79,13 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, }, + { + name: "http large sharded file", + httpRemotes: 1, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, &remotes[0].LinkSystem, rndReader, 4<<20)} + }, + }, { name: "graphsync large directory", graphsyncRemotes: 1, @@ -92,6 +100,13 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{unixfs.GenerateDirectory(t, remotes[0].LinkSystem, rndReader, 16<<20, false)} }, }, + { + name: "http large directory", + httpRemotes: 1, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, false)} + }, + }, { name: "graphsync large sharded directory", graphsyncRemotes: 1, @@ -106,6 +121,13 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{unixfs.GenerateDirectory(t, remotes[0].LinkSystem, rndReader, 16<<20, true)} }, }, + { + name: "http large sharded directory", + httpRemotes: 1, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateDirectory(t, &remotes[0].LinkSystem, rndReader, 16<<20, true)} + }, + }, { name: "graphsync max block limit", graphsyncRemotes: 1, @@ -636,6 +658,8 @@ func TestHttpFetch(t *testing.T) { finishedChans = append(finishedChans, mocknet.SetupRetrieval(t, r)) } mrn.AddBitswapPeers(testCase.bitswapRemotes) + mrn.AddHttpPeers(testCase.httpRemotes) + require.NoError(t, mrn.MN.LinkAll()) carFiles := debugRemotes(t, ctx, testCase.name, mrn.Remotes) diff --git a/pkg/internal/itest/mocknet/mocknet.go b/pkg/internal/itest/mocknet/mocknet.go index b3278c96..87b5d4f7 100644 --- a/pkg/internal/itest/mocknet/mocknet.go +++ b/pkg/internal/itest/mocknet/mocknet.go @@ -22,6 +22,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" lpmock "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -62,22 +63,25 @@ func NewMockRetrievalNet(ctx context.Context, t *testing.T) *MockRetrievalNet { } func (mrn *MockRetrievalNet) AddBitswapPeers(n int) { - peers := mrn.testPeerGenerator.BitswapPeers(n) - for i := 0; i < n; i++ { - mrn.Remotes = append(mrn.Remotes, peers[i]) - mrn.RemoteEvents = append(mrn.RemoteEvents, make([]datatransfer.Event, 0)) // not used for bitswap - mrn.FinishedChan = append(mrn.FinishedChan, make(chan struct{}, 1)) // not used for bitswap - } + mrn.addPeers(mrn.testPeerGenerator.BitswapPeers(n)) } func (mrn *MockRetrievalNet) AddGraphsyncPeers(n int) { - peers := mrn.testPeerGenerator.GraphsyncPeers(n) - for i := 0; i < n; i++ { + mrn.addPeers(mrn.testPeerGenerator.GraphsyncPeers(n)) +} + +func (mrn *MockRetrievalNet) AddHttpPeers(n int) { + mrn.addPeers(mrn.testPeerGenerator.HttpPeers(n)) +} + +func (mrn *MockRetrievalNet) addPeers(peers []testpeer.TestPeer) { + for i := 0; i < len(peers); i++ { mrn.Remotes = append(mrn.Remotes, peers[i]) mrn.RemoteEvents = append(mrn.RemoteEvents, make([]datatransfer.Event, 0)) mrn.FinishedChan = append(mrn.FinishedChan, make(chan struct{}, 1)) } } + func SetupRetrieval(t *testing.T, remote testpeer.TestPeer) chan []datatransfer.Event { // Register DealProposal voucher type with automatic Pull acceptance remoteDealValidator := &mockDealValidator{t: t, acceptPull: true} @@ -129,6 +133,9 @@ func (mrn *MockRetrievalNet) TearDown() error { if h.BitswapNetwork != nil { h.BitswapNetwork.Stop() } + if h.HttpServer != nil { + h.HttpServer.Close() + } }(h) } wg.Wait() @@ -144,10 +151,13 @@ func (mcf *mockCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) for _, h := range mcf.mrn.Remotes { if _, has := h.Cids[cid]; has { var md metadata.Metadata - if h.BitswapServer != nil { + switch h.Protocol { + case multicodec.TransportBitswap: md = metadata.Default.New(metadata.Bitswap{}) - } else { + case multicodec.TransportGraphsyncFilecoinv1: md = metadata.Default.New(&metadata.GraphsyncFilecoinV1{PieceCID: cid}) + case multicodec.TransportIpfsGatewayHttp: + md = metadata.Default.New(&metadata.IpfsGatewayHttp{}) } candidates = append(candidates, types.RetrievalCandidate{MinerPeer: *h.AddrInfo(), RootCid: cid, Metadata: md}) } diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index 03863d72..e45df226 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -1,9 +1,12 @@ package testpeer import ( + "bytes" "context" "fmt" "io" + "net/http" + "strings" "testing" "time" @@ -11,6 +14,7 @@ import ( dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" dtnet "github.com/filecoin-project/go-data-transfer/v2/network" gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" + "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" delayed "github.com/ipfs/go-datastore/delayed" @@ -22,18 +26,31 @@ import ( delay "github.com/ipfs/go-ipfs-delay" bsnet "github.com/ipfs/go-libipfs/bitswap/network" "github.com/ipfs/go-libipfs/bitswap/server" + "github.com/ipfs/go-log/v2" + "github.com/ipfs/go-unixfsnode" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage" + dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" tnet "github.com/libp2p/go-libp2p-testing/net" p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil" "github.com/libp2p/go-libp2p/core/host" peer "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) +var logger = log.Logger("lassie/mocknet") + // NewTestPeerGenerator generates a new TestPeerGenerator for the given // mocknet func NewTestPeerGenerator(ctx context.Context, t *testing.T, mn mocknet.Mocknet, netOptions []bsnet.NetOpt, bsOptions []server.Option) TestPeerGenerator { @@ -67,7 +84,7 @@ func (g *TestPeerGenerator) Close() error { return nil // for Closer interface } -// Next generates a new test peer with bitswap + dependencies +// NextBitswap generates a new test peer with bitswap + dependencies func (g *TestPeerGenerator) NextBitswap() TestPeer { g.seq++ p, err := p2ptestutil.RandTestBogusIdentity() @@ -77,7 +94,7 @@ func (g *TestPeerGenerator) NextBitswap() TestPeer { return tp } -// Next generates a new test peer with graphsync + dependencies +// NextGraphsync generates a new test peer with graphsync + dependencies func (g *TestPeerGenerator) NextGraphsync() TestPeer { g.seq++ p, err := p2ptestutil.RandTestBogusIdentity() @@ -87,7 +104,17 @@ func (g *TestPeerGenerator) NextGraphsync() TestPeer { return tp } -// Peers creates N test peers with bitswap + dependencies +// NextHttp generates a new test peer with http + dependencies +func (g *TestPeerGenerator) NextHttp() TestPeer { + g.seq++ + p, err := p2ptestutil.RandTestBogusIdentity() + require.NoError(g.t, err) + tp, err := NewTestHttpPeer(g.ctx, g.mn, p, g.t) + require.NoError(g.t, err) + return tp +} + +// BitswapPeers creates N test peers with bitswap + dependencies func (g *TestPeerGenerator) BitswapPeers(n int) []TestPeer { var instances []TestPeer for j := 0; j < n; j++ { @@ -97,7 +124,7 @@ func (g *TestPeerGenerator) BitswapPeers(n int) []TestPeer { return instances } -// Peers creates N test peers with bitswap + dependencies +// GraphsyncPeers creates N test peers with graphsync + dependencies func (g *TestPeerGenerator) GraphsyncPeers(n int) []TestPeer { var instances []TestPeer for j := 0; j < n; j++ { @@ -107,6 +134,16 @@ func (g *TestPeerGenerator) GraphsyncPeers(n int) []TestPeer { return instances } +// HttpPeers creates N test peers with http + dependencies +func (g *TestPeerGenerator) HttpPeers(n int) []TestPeer { + var instances []TestPeer + for j := 0; j < n; j++ { + inst := g.NextHttp() + instances = append(instances, inst) + } + return instances +} + // ConnectPeers connects the given peers to each other func ConnectPeers(instances []TestPeer) { for i, inst := range instances { @@ -126,11 +163,13 @@ type TestPeer struct { BitswapServer *server.Server BitswapNetwork bsnet.BitSwapNetwork DatatransferServer datatransfer.Manager + HttpServer *TestPeerHttpServer blockstore blockstore.Blockstore Host host.Host blockstoreDelay delay.D LinkSystem *linking.LinkSystem Cids map[cid.Cid]struct{} + Protocol multicodec.Code } // Blockstore returns the block store for this test instance @@ -151,7 +190,7 @@ func (i TestPeer) AddrInfo() *peer.AddrInfo { } } -// NewTestPeer creates a test peer instance. +// NewTestBitswapPeer creates a test peer instance. // // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's @@ -170,6 +209,7 @@ func NewTestBitswapPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity }() peer.BitswapServer = bs peer.BitswapNetwork = bsNet + peer.Protocol = multicodec.TransportBitswap return peer, nil } @@ -195,6 +235,42 @@ func NewTestGraphsyncPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identi } peer.DatatransferServer = dtRemote + peer.Protocol = multicodec.TransportGraphsyncFilecoinv1 + return peer, nil +} + +func NewTestHttpPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity, t *testing.T) (TestPeer, error) { + peer, _, err := newTestPeer(ctx, mn, p) + if err != nil { + return TestPeer{}, err + } + + // Create http multiaddr from random peer addr and add it to the peer's addreses + httpAddr := p.Address().Encapsulate(ma.StringCast("/http")) + peer.Host.Peerstore().AddAddr(p.ID(), httpAddr, 10*time.Minute) // TODO: Look into ttl duration? + + go func() { + // Parse multiaddr IP and port, serve http server from address + addrParts := strings.Split(p.Address().String(), "/") + peerHttpServer, err := NewTestPeerHttpServer(ctx, addrParts[2], addrParts[4]) + if err != nil { + logger.Errorw("failed to make test peer http server", "err", err) + ctx.Done() + } + peer.HttpServer = peerHttpServer + + // Handle custom /ipfs/ endpoint + peerHttpServer.Mux.HandleFunc("/ipfs/", MockIpfsHandler(ctx, peer.LinkSystem)) + + // Start the server + peerHttpServer.Start() + if err != http.ErrServerClosed { + logger.Errorw("failed to start peer http server", "err", err) + ctx.Done() + } + }() + + peer.Protocol = multicodec.TransportIpfsGatewayHttp return peer, nil } @@ -260,3 +336,107 @@ func StartAndWaitForReady(ctx context.Context, manager datatransfer.Manager) err return err } } + +func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.ResponseWriter, *http.Request) { + return func(res http.ResponseWriter, req *http.Request) { + urlPath := strings.Split(req.URL.Path, "/")[1:] + + // validate CID path parameter + cidStr := urlPath[1] + rootCid, err := cid.Parse(cidStr) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to parse CID path parameter: %s", cidStr), http.StatusBadRequest) + return + } + + // Grab unixfs path if it exists + unixfsPath := "" + if len(urlPath) > 2 { + unixfsPath = "/" + strings.Join(urlPath[2:], "/") + } + + // We're always providing the car-scope parameter, so add a failure case if we stop + // providing it in the future + if !req.URL.Query().Has("car-scope") { + http.Error(res, "Missing car-scope parameter", http.StatusBadRequest) + return + } + + // Parse car scope and use it to get selector + var carScope types.CarScope + switch req.URL.Query().Get("car-scope") { + case "all": + carScope = types.CarScopeAll + case "file": + carScope = types.CarScopeFile + case "block": + carScope = types.CarScopeBlock + default: + http.Error(res, fmt.Sprintf("Invalid car-scope parameter: %s", req.URL.Query().Get("car-scope")), http.StatusBadRequest) + return + } + + selNode := unixfsnode.UnixFSPathSelectorBuilder(unixfsPath, carScope.TerminalSelectorSpec(), false) + sel, err := selector.CompileSelector(selNode) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to compile selector from car-scope: %v", err), http.StatusInternalServerError) + return + } + + // Write to response writer + carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false)) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError) + return + } + + // Extend the StorageReadOpener func to write to the carWriter + originalSRO := lsys.StorageReadOpener + lsys.StorageReadOpener = func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { + r, err := originalSRO(lc, lnk) + if err != nil { + return nil, err + } + byts, err := io.ReadAll(r) + if err != nil { + return nil, err + } + err = carWriter.Put(ctx, lnk.(cidlink.Link).Cid.KeyString(), byts) + if err != nil { + return nil, err + } + + return bytes.NewReader(byts), nil + } + + protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser) + lnk := cidlink.Link{Cid: rootCid} + lnkCtx := linking.LinkContext{} + proto, err := protoChooser(lnk, lnkCtx) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to choose prototype node: %s", cidStr), http.StatusBadRequest) + return + } + + rootNode, err := lsys.Load(lnkCtx, lnk, proto) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to load root cid into link system: %v", err), http.StatusInternalServerError) + return + } + + cfg := &traversal.Config{ + Ctx: ctx, + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: protoChooser, + } + progress := traversal.Progress{Cfg: cfg} + + err = progress.WalkAdv(rootNode, sel, visitNoop) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to traverse from root node: %v", err), http.StatusInternalServerError) + return + } + } +} + +func visitNoop(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } diff --git a/pkg/internal/itest/testpeer/peerhttpserver.go b/pkg/internal/itest/testpeer/peerhttpserver.go new file mode 100644 index 00000000..54588c2e --- /dev/null +++ b/pkg/internal/itest/testpeer/peerhttpserver.go @@ -0,0 +1,67 @@ +package testpeer + +import ( + "context" + "fmt" + "net" + "net/http" + + servertiming "github.com/mitchellh/go-server-timing" +) + +type TestPeerHttpServer struct { + cancel context.CancelFunc + ctx context.Context + listener net.Listener + server *http.Server + Mux *http.ServeMux +} + +// NewTestPeerHttpServer creates a new HttpServer +func NewTestPeerHttpServer(ctx context.Context, host string, port string) (*TestPeerHttpServer, error) { + addr := fmt.Sprintf("%s:%s", host, port) + listener, err := net.Listen("tcp", addr) // assigns a port if port is 0 + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + // create server + mux := http.NewServeMux() + handler := servertiming.Middleware(mux, nil) + server := &http.Server{ + Addr: addr, + BaseContext: func(listener net.Listener) context.Context { return ctx }, + Handler: handler, + } + + httpServer := &TestPeerHttpServer{ + cancel: cancel, + ctx: ctx, + listener: listener, + server: server, + Mux: mux, + } + + return httpServer, nil +} + +// Start starts the http server, returning an error if the server failed to start +func (s *TestPeerHttpServer) Start() error { + logger.Infow("starting test peer http server", "listen_addr", s.listener.Addr()) + err := s.server.Serve(s.listener) + if err != http.ErrServerClosed { + logger.Errorw("failed to start test peer http server", "err", err) + return err + } + + return nil +} + +// Close shutsdown the server and cancels the server context +func (s *TestPeerHttpServer) Close() error { + logger.Info("closing test peer http server") + s.cancel() + return s.server.Shutdown(context.Background()) +}