Skip to content

Commit

Permalink
add ChannelStage to keep track of lifecycle of DataTransfer
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Mar 18, 2021
1 parent 42e0a5b commit bda3a51
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 23 deletions.
5 changes: 5 additions & 0 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type channelState struct {
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
channelCIDsReader ChannelCIDsReader

stages *datatransfer.ChannelStages
}

// EmptyChannelState is the zero value for channel state, meaning not present
Expand Down Expand Up @@ -171,6 +173,8 @@ func (c channelState) OtherPeer() peer.ID {
return c.sender
}

func (c channelState) Stages() *datatransfer.ChannelStages { return c.stages }

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, channelCIDsReader ChannelCIDsReader) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
Expand All @@ -191,6 +195,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
channelCIDsReader: channelCIDsReader,
stages: c.Stages,
}
}

Expand Down
1 change: 1 addition & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
Selector: &cbg.Deferred{Raw: selBytes},
Sender: dataSender,
Recipient: dataReceiver,
Stages: &datatransfer.ChannelStages{},
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type(),
Expand Down
97 changes: 76 additions & 21 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,96 +24,151 @@ var transferringStates = []fsm.StateKey{

// ChannelEvents describe the events taht can
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
chst.AddLog("received data")
return nil
}),

fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
chst.AddLog("sent data")
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
chst.AddLog("queued data")
return nil
}),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()
chst.AddLog("disconnected with err: %s", chst.Message)
return nil
}),

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("transfer failed with err: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
chst.AddLog("got new voucher")
return nil
}),
fsm.Event(datatransfer.NewVoucherResult).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherResultBytes []byte) error {
chst.VoucherResults = append(chst.VoucherResults,
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
chst.AddLog("got new voucher result")
return nil
}),
fsm.Event(datatransfer.PauseInitiator).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.PauseResponder).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.ResumeInitiator).
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
FromAny().ToJustRecord(),
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.ResumeResponder).
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
From(datatransfer.Finalizing).To(datatransfer.Completing).
FromAny().ToJustRecord(),
FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.FinishTransfer).
FromAny().To(datatransfer.TransferFinished).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.ResponderBeginsFinalization).
FromAny().To(datatransfer.ResponderFinalizing).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.ResponderCompletes).
FromAny().To(datatransfer.ResponderCompleted).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
From(datatransfer.TransferFinished).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.CleanupComplete).
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
From(datatransfer.Completing).To(datatransfer.Completed),
From(datatransfer.Completing).To(datatransfer.Completed).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

// will kickoff state handlers for channels that were cleaning up
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(),
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
}

// ChannelStateEntryFuncs are handlers called as we enter different states
Expand Down
18 changes: 18 additions & 0 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package internal

import (
"fmt"

"github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

logging "github.com/ipfs/go-log/v2"

datatransfer "github.com/filecoin-project/go-data-transfer"
)

var log = logging.Logger("datatransfer")

//go:generate cbor-gen-for --map-encoding ChannelState EncodedVoucher EncodedVoucherResult

// EncodedVoucher is how the voucher is stored on disk
Expand Down Expand Up @@ -58,4 +64,16 @@ type ChannelState struct {
Message string
Vouchers []EncodedVoucher
VoucherResults []EncodedVoucherResult

Stages *datatransfer.ChannelStages
}

func (cs *ChannelState) AddLog(msg string, a ...interface{}) {
if len(a) > 0 {
msg = fmt.Sprintf(msg, a...)
}

stage := datatransfer.Statuses[cs.Status]

cs.Stages.AddLog(stage, msg)
}
39 changes: 38 additions & 1 deletion channels/internal/internalchannel_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package datatransfer

import (
"fmt"
"time"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"

cbg "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/go-data-transfer/encoding"
)

//go:generate cbor-gen-for ChannelID
//go:generate cbor-gen-for ChannelID ChannelStages ChannelStage Log

// TypeIdentifier is a unique string identifier for a type of encodable object in a
// registry
Expand Down Expand Up @@ -132,4 +135,57 @@ type ChannelState interface {

// Queued returns the number of bytes read from the node and queued for sending
Queued() uint64

Stages() *ChannelStages
}

type ChannelStages struct {
Stages []*ChannelStage
}

func (cs *ChannelStages) AddLog(stage, msg string) {
//log.Infof("adding log for stage <%s> msg <%s>", stage, msg)

now := curTime()
st := cs.GetStage(stage)
if st == nil {
st = &ChannelStage{
CreatedTime: now,
}
cs.Stages = append(cs.Stages, st)
}

st.Name = stage
st.UpdatedTime = now
if msg != "" && (len(st.Logs) == 0 || st.Logs[len(st.Logs)-1].Log != msg) {
st.Logs = append(st.Logs, &Log{msg, now})
}
}

func (cs *ChannelStages) GetStage(stage string) *ChannelStage {
for _, s := range cs.Stages {
if s.Name == stage {
return s
}
}

return nil
}

type ChannelStage struct {
Name string
Description string
CreatedTime cbg.CborTime
UpdatedTime cbg.CborTime
Logs []*Log
}

type Log struct {
Log string
UpdatedTime cbg.CborTime
}

func curTime() cbg.CborTime {
now := time.Now()
return cbg.CborTime(time.Unix(0, now.UnixNano()).UTC())
}

0 comments on commit bda3a51

Please sign in to comment.