diff --git a/node/modules/rpcstatemanager.go b/chain/stmgr/rpc/rpcstatemanager.go similarity index 99% rename from node/modules/rpcstatemanager.go rename to chain/stmgr/rpc/rpcstatemanager.go index b14e1dc8008..98ea315db9f 100644 --- a/node/modules/rpcstatemanager.go +++ b/chain/stmgr/rpc/rpcstatemanager.go @@ -1,4 +1,4 @@ -package modules +package rpcstmgr import ( "context" diff --git a/node/builder.go b/node/builder.go index ee0a66e5322..df183b2162b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/exchange" + rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/wallet" @@ -307,9 +308,10 @@ var ChainNode = Options( Override(new(api.WalletAPI), From(new(wallet.MultiWallet))), // Service: Payment channels - Override(new(*paychmgr.Store), paychmgr.NewStore), - Override(new(*paychmgr.Manager), paychmgr.NewManager), - Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager), + Override(new(paychmgr.PaychAPI), From(new(modules.PaychAPI))), + Override(new(*paychmgr.Store), modules.NewPaychStore), + Override(new(*paychmgr.Manager), modules.NewManager), + Override(HandlePaymentChannelManagerKey, modules.HandlePaychManager), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), // Markets (common) @@ -334,7 +336,7 @@ var ChainNode = Options( Override(new(full.GasModuleAPI), From(new(api.GatewayAPI))), Override(new(full.MpoolModuleAPI), From(new(api.GatewayAPI))), Override(new(full.StateModuleAPI), From(new(api.GatewayAPI))), - Override(new(stmgr.StateManagerAPI), modules.NewRPCStateManager), + Override(new(stmgr.StateManagerAPI), rpcstmgr.NewRPCStateManager), ), // Full node API / service startup diff --git a/node/modules/paych.go b/node/modules/paych.go new file mode 100644 index 00000000000..a9fd25a3ec9 --- /dev/null +++ b/node/modules/paych.go @@ -0,0 +1,45 @@ +package modules + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/paychmgr" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "go.uber.org/fx" +) + +func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm stmgr.StateManagerAPI, pchstore *paychmgr.Store, api paychmgr.PaychAPI) *paychmgr.Manager { + ctx := helpers.LifecycleCtx(mctx, lc) + ctx, shutdown := context.WithCancel(ctx) + + return paychmgr.NewManager(ctx, shutdown, sm, pchstore, api) +} + +func NewPaychStore(ds dtypes.MetadataDS) *paychmgr.Store { + ds = namespace.Wrap(ds, datastore.NewKey("/paych/")) + return paychmgr.NewStore(ds) +} + +type PaychAPI struct { + fx.In + + full.MpoolAPI + full.StateAPI +} + +// HandlePaychManager is called by dependency injection to set up hooks +func HandlePaychManager(lc fx.Lifecycle, pm *paychmgr.Manager) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return pm.Start() + }, + OnStop: func(context.Context) error { + return pm.Stop() + }, + }) +} diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 5e0aa88cea8..e9700bc9d14 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" - "go.uber.org/fx" xerrors "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -19,22 +18,12 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node/impl/full" - "github.com/filecoin-project/lotus/node/modules/helpers" ) var log = logging.Logger("paych") var errProofNotSupported = errors.New("payment channel proof parameter is not supported") -// PaychAPI is used by dependency injection to pass the consituent APIs to NewManager() -type PaychAPI struct { - fx.In - - full.MpoolAPI - full.StateAPI -} - // stateManagerAPI defines the methods needed from StateManager type stateManagerAPI interface { ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) @@ -43,7 +32,7 @@ type stateManagerAPI interface { } // paychAPI defines the API methods needed by the payment channel manager -type paychAPI interface { +type PaychAPI interface { StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) MpoolPushMessage(ctx context.Context, msg *types.Message, maxFee *api.MessageSendSpec) (*types.SignedMessage, error) @@ -55,13 +44,13 @@ type paychAPI interface { // managerAPI defines all methods needed by the manager type managerAPI interface { stateManagerAPI - paychAPI + PaychAPI } // managerAPIImpl is used to create a composite that implements managerAPI type managerAPIImpl struct { stmgr.StateManagerAPI - paychAPI + PaychAPI } type Manager struct { @@ -77,11 +66,8 @@ type Manager struct { channels map[string]*channelAccessor } -func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager { - ctx := helpers.LifecycleCtx(mctx, lc) - ctx, shutdown := context.WithCancel(ctx) - - impl := &managerAPIImpl{StateManagerAPI: sm, paychAPI: &api} +func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager { + impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api} return &Manager{ ctx: ctx, shutdown: shutdown, @@ -103,18 +89,6 @@ func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) { return pm, pm.Start() } -// HandleManager is called by dependency injection to set up hooks -func HandleManager(lc fx.Lifecycle, pm *Manager) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return pm.Start() - }, - OnStop: func(context.Context) error { - return pm.Stop() - }, - }) -} - // Start restarts tracking of any messages that were sent to chain. func (pm *Manager) Start() error { return pm.restartPending() diff --git a/paychmgr/store.go b/paychmgr/store.go index a17ad1fcd80..343149f932e 100644 --- a/paychmgr/store.go +++ b/paychmgr/store.go @@ -14,14 +14,12 @@ import ( cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" dsq "github.com/ipfs/go-datastore/query" "github.com/filecoin-project/go-address" cborrpc "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/chain/actors/builtin/paych" - "github.com/filecoin-project/lotus/node/modules/dtypes" ) var ErrChannelNotTracked = errors.New("channel not tracked") @@ -30,8 +28,7 @@ type Store struct { ds datastore.Batching } -func NewStore(ds dtypes.MetadataDS) *Store { - ds = namespace.Wrap(ds, datastore.NewKey("/paych/")) +func NewStore(ds datastore.Batching) *Store { return &Store{ ds: ds, }