diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index cf20f870..b12cde15 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -60,20 +60,29 @@ var ChannelEvents = fsm.Events{ chst.AddLog("received data") return nil }), - fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error { - chst.AddLog("") - return nil - }), + + fsm.Event(datatransfer.DataSent). + FromMany(transferringStates...).ToNoChange(). + From(datatransfer.TransferFinished).ToNoChange(). + Action(func(chst *internal.ChannelState) error { + chst.AddLog("") + return nil + }), + fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Sent += delta chst.AddLog("sending data") return nil }), - fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error { - chst.AddLog("") - return nil - }), + + fsm.Event(datatransfer.DataQueued). + FromMany(transferringStates...).ToNoChange(). + From(datatransfer.TransferFinished).ToNoChange(). + Action(func(chst *internal.ChannelState) error { + chst.AddLog("") + return nil + }), fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange(). Action(func(chst *internal.ChannelState, delta uint64) error { chst.Queued += delta diff --git a/channels/channels_test.go b/channels/channels_test.go index 82c8bc6b..11328029 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -138,6 +138,38 @@ func TestChannels(t *testing.T) { require.Equal(t, state.Status(), datatransfer.Ongoing) }) + t.Run("datasent/queued when transfer is already finished", func(t *testing.T) { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + dir := os.TempDir() + cidLists, err := cidlists.NewCIDLists(dir) + require.NoError(t, err) + channelList, err := channels.New(ds, cidLists, notifier, decoderByType, decoderByType, &fakeEnv{}, peers[0]) + require.NoError(t, err) + err = channelList.Start(ctx) + require.NoError(t, err) + + chid, err := channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1]) + require.NoError(t, err) + checkEvent(ctx, t, received, datatransfer.Open) + + // move the channel to `TransferFinished` state. + require.NoError(t, channelList.FinishTransfer(chid)) + state := checkEvent(ctx, t, received, datatransfer.FinishTransfer) + require.Equal(t, datatransfer.TransferFinished, state.Status()) + + // send a data-sent event and ensure it's a no-op + _, err = channelList.DataSent(chid, cids[1], 1) + require.NoError(t, err) + state = checkEvent(ctx, t, received, datatransfer.DataSent) + require.Equal(t, datatransfer.TransferFinished, state.Status()) + + // send a data-queued event and ensure it's a no-op. + _, err = channelList.DataQueued(chid, cids[1], 1) + require.NoError(t, err) + state = checkEvent(ctx, t, received, datatransfer.DataQueued) + require.Equal(t, datatransfer.TransferFinished, state.Status()) + }) + t.Run("updating send/receive values", func(t *testing.T) { ds := dss.MutexWrap(datastore.NewMapDatastore()) dir := os.TempDir()