Skip to content

Commit

Permalink
Fix Pull Request Cancelling (#67)
Browse files Browse the repository at this point in the history
* fix(graphsync): fix lock on cancel

Fix a lock held to long that was causing an error for closing a data transfer channel

* fix(lint): fix lint & imports
  • Loading branch information
hannahhoward committed Aug 21, 2020
1 parent b255b78 commit c6b4d74
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 9 deletions.
11 changes: 7 additions & 4 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,32 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.PauseInitiator).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
FromAny().ToNoChange(),
FromAny().ToJustRecord(),
fsm.Event(datatransfer.PauseResponder).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
FromAny().ToNoChange(),
FromAny().ToJustRecord(),
fsm.Event(datatransfer.ResumeInitiator).
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
FromAny().ToNoChange(),
FromAny().ToJustRecord(),
fsm.Event(datatransfer.ResumeResponder).
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
From(datatransfer.Finalizing).To(datatransfer.Completing).
FromAny().ToNoChange(),
FromAny().ToJustRecord(),
fsm.Event(datatransfer.FinishTransfer).
FromAny().To(datatransfer.TransferFinished).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),
fsm.Event(datatransfer.ResponderBeginsFinalization).
FromAny().To(datatransfer.ResponderFinalizing).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),
fsm.Event(datatransfer.ResponderCompletes).
FromAny().To(datatransfer.ResponderCompleted).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
From(datatransfer.TransferFinished).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
Expand Down
252 changes: 252 additions & 0 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ import (
"github.com/ipfs/go-datastore/namespace"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-graphsync"
gsimpl "github.com/ipfs/go-graphsync/impl"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
bstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/encoding"
. "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -333,6 +338,253 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
}
}

func TestManyReceiversAtOnce(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
isPull bool
receiverCount int
}{
"multiple receivers for push requests": {
receiverCount: 10,
},
"multiple receivers for pull requests": {
isPull: true,
receiverCount: 10,
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
host1 := gsData.Host1 // initiator, data sender

tp1 := gsData.SetupGSTransportHost1()
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)

destDagServices := make([]ipldformat.DAGService, 0, data.receiverCount)
receivers := make([]datatransfer.Manager, 0, data.receiverCount)
hosts := make([]host.Host, 0, data.receiverCount)
for i := 0; i < data.receiverCount; i++ {
host, err := gsData.Mn.GenPeer()
require.NoError(t, err, "error generating host")
gsnet := gsnet.NewFromLibp2pHost(host)
dtnet := network.NewFromLibp2pHost(host)
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore")))
altBs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("altstore")))

loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
altLoader := storeutil.LoaderForBlockstore(altBs)
altStorer := storeutil.StorerForBlockstore(altBs)

destDagService := merkledag.NewDAGService(blockservice.New(altBs, offline.Exchange(altBs)))

gs := gsimpl.New(gsData.Ctx, gsnet, loader, storer)
gsTransport := tp.NewTransport(host.ID(), gs)

dtDs := namespace.Wrap(ds, datastore.NewKey("datatransfer"))

storedCounter := storedcounter.New(ds, datastore.NewKey("counter"))

receiver, err := NewDataTransfer(dtDs, dtnet, gsTransport, storedCounter)
require.NoError(t, err)
err = receiver.Start(gsData.Ctx)
require.NoError(t, err)

err = receiver.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) {
_, isFv := testVoucher.(*testutil.FakeDTType)
gsTransport, isGs := transport.(*tp.Transport)
if isFv && isGs {
err := gsTransport.UseStore(channelID, altLoader, altStorer)
require.NoError(t, err)
}
})
require.NoError(t, err)

destDagServices = append(destDagServices, destDagService)
receivers = append(receivers, receiver)
hosts = append(hosts, host)
}
err = gsData.Mn.LinkAll()
require.NoError(t, err, "error linking hosts")

finished := make(chan struct{}, 2*data.receiverCount)
errChan := make(chan string, 2*data.receiverCount)
opened := make(chan struct{}, 2*data.receiverCount)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- event.Message
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
for _, receiver := range receivers {
receiver.SubscribeToEvents(subscriber)
}
vouchers := make([]datatransfer.Voucher, 0, data.receiverCount)
for i := 0; i < data.receiverCount; i++ {
vouchers = append(vouchers, testutil.NewFakeDTType())
}
sv := testutil.NewStubbedValidator()

root, origBytes := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1)
rootCid := root.(cidlink.Link).Cid

if data.isPull {
sv.ExpectSuccessPull()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
for i, receiver := range receivers {
_, err = receiver.OpenPullDataChannel(ctx, host1.ID(), vouchers[i], rootCid, gsData.AllSelector)
require.NoError(t, err)
}
} else {
sv.ExpectSuccessPush()
for i, receiver := range receivers {
require.NoError(t, receiver.RegisterVoucherType(&testutil.FakeDTType{}, sv))
_, err = dt1.OpenPushDataChannel(ctx, hosts[i].ID(), vouchers[i], rootCid, gsData.AllSelector)
require.NoError(t, err)
}
}
opens := 0
completes := 0
for opens < 2*data.receiverCount || completes < 2*data.receiverCount {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
completes++
case <-opened:
opens++
case err := <-errChan:
t.Fatalf("received error on data transfer: %s", err)
}
}
for _, destDagService := range destDagServices {
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
}
})
}
}

func TestRoundTripCancelledRequest(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
isPull bool
}{
"cancelled push request": {},
"cancelled pull request": {
isPull: true,
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
err = dt2.Start(ctx)
require.NoError(t, err)

finished := make(chan struct{}, 2)
errChan := make(chan string, 2)
cancelled := make(chan struct{}, 2)
accepted := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
if event.Code == datatransfer.Accept {
accepted <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- event.Message
}
if event.Code == datatransfer.Cancel {
cancelled <- struct{}{}
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()
root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1)
rootCid := root.(cidlink.Link).Cid

var chid datatransfer.ChannelID
if data.isPull {
sv.ExpectPausePull()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
} else {
sv.ExpectPausePush()
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
}
require.NoError(t, err)
opens := 0
cancels := 0
accepts := 0
for opens < 2 || cancels < 2 {
select {
case <-ctx.Done():
t.Fatal("Did not finish data transfer")
case <-finished:
t.Fatal("request completed succussfully but should have been cancelled")
case <-opened:
opens++
case <-cancelled:
cancels++
case <-accepted:
if accepts == 0 {
timer := time.NewTimer(10 * time.Millisecond)
go func() {
select {
case <-ctx.Done():
case <-timer.C:
if data.isPull {
_ = dt1.CloseDataTransferChannel(ctx, chid)
} else {
_ = dt2.CloseDataTransferChannel(ctx, chid)
}
}
}()
}
accepts++
case err := <-errChan:
t.Fatalf("received error on data transfer: %s", err)
}
}
})
}
}

type retrievalRevalidator struct {
*testutil.StubbedRevalidator
dataSoFar uint64
Expand Down
9 changes: 5 additions & 4 deletions testutil/gstestdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const unixfsLinksPerLevel = 1024
// graphsync
type GraphsyncTestingData struct {
Ctx context.Context
Mn mocknet.Mocknet
StoredCounter1 *storedcounter.StoredCounter
StoredCounter2 *storedcounter.StoredCounter
DtDs1 datastore.Datastore
Expand Down Expand Up @@ -111,17 +112,17 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T) *GraphsyncTestin
gsData.Loader2 = storeutil.LoaderForBlockstore(gsData.Bs2)
gsData.Storer2 = storeutil.StorerForBlockstore(gsData.Bs2)

mn := mocknet.New(ctx)
gsData.Mn = mocknet.New(ctx)

// setup network
var err error
gsData.Host1, err = mn.GenPeer()
gsData.Host1, err = gsData.Mn.GenPeer()
require.NoError(t, err)

gsData.Host2, err = mn.GenPeer()
gsData.Host2, err = gsData.Mn.GenPeer()
require.NoError(t, err)

err = mn.LinkAll()
err = gsData.Mn.LinkAll()
require.NoError(t, err)

gsData.GsNet1 = gsnet.NewFromLibp2pHost(gsData.Host1)
Expand Down
2 changes: 1 addition & 1 deletion transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI
return nil
}
t.dataLock.Lock()
defer t.dataLock.Unlock()
if _, ok := t.requestorCancelledMap[chid]; ok {
return nil
}
t.dataLock.Unlock()
return t.gs.CancelResponse(gsKey.p, gsKey.requestID)
}

Expand Down

0 comments on commit c6b4d74

Please sign in to comment.