Skip to content

Commit

Permalink
lnd: properly execute force closures kicked off via RPC
Browse files Browse the repository at this point in the history
This commit includes some slight refactoring to properly execute force
closures which are initiated by RPC clients.

The CloseLink method within the htlcSwitch has been extended to take an
additional parameter which indicates if the link should be closed
forcefully. If so, then the channelManager which dispatches the request
executes a force closure using the target channel state machine. Once
the closing transaction has been broadcast, the summary is sent to the
utxoNursery so the outputs can be swept once they’re mature.
  • Loading branch information
Roasbeef committed Sep 13, 2016
1 parent bfa2a1d commit 80b09f7
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 35 deletions.
27 changes: 21 additions & 6 deletions htlcswitch.go
Expand Up @@ -63,7 +63,13 @@ type htlcSwitch struct {
started int32 // atomic
shutdown int32 // atomic

chanIndex map[wire.OutPoint]*link
// chanIndex maps a channel's outpoint to a link which contains
// additional information about the channel, and additionally houses a
// pointer to the peer mangaing the channel.
chanIndex map[wire.OutPoint]*link

// interfaces maps a node's ID to the set of links (active channels) we
// currently have open with that peer.
interfaces map[wire.ShaHash][]*link

// TODO(roasbeef): msgs for dynamic link quality
Expand Down Expand Up @@ -395,20 +401,29 @@ func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutP
// closeChanReq represents a request to close a particular channel specified
// by its outpoint.
type closeLinkReq struct {
chanPoint *wire.OutPoint
chanPoint *wire.OutPoint
forceClose bool

updates chan *lnrpc.CloseStatusUpdate
err chan error
}

// CloseLink closes an active link targetted by it's channel point. Closing the
// link initiates a cooperative channel closure.
// TODO(roabeef): bool flag for timeout/force
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *lnrpc.CloseStatusUpdate, chan error) {
// link initiates a cooperative channel closure iff forceClose is false. If
// forceClose is true, then a unilateral channel closure is executed.
// TODO(roabeef): bool flag for timeout
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint,
forceClose bool) (chan *lnrpc.CloseStatusUpdate, chan error) {

updateChan := make(chan *lnrpc.CloseStatusUpdate, 1)
errChan := make(chan error, 1)

h.linkControl <- &closeLinkReq{chanPoint, updateChan, errChan}
h.linkControl <- &closeLinkReq{
chanPoint: chanPoint,
forceClose: forceClose,
updates: updateChan,
err: errChan,
}

return updateChan, errChan
}
Expand Down
114 changes: 92 additions & 22 deletions peer.go
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/BitfuryLightning/tools/rt/graph"
)

var (
Expand Down Expand Up @@ -624,46 +624,102 @@ out:
p.wg.Done()
}

// handleLocalClose kicks-off the workflow to execute a cooperative closure of
// the channel initiated by a local sub-system.
func (p *peer) handleLocalClose(req *closeLinkReq) {
chanPoint := req.chanPoint
key := wire.OutPoint{
Hash: chanPoint.Hash,
Index: chanPoint.Index,
// executeForceClose executes a unilateral close of the target channel by
// broadcasting the current commitment state directly on-chain. Once the
// commitment transaction has been broadcast, a struct describing the final
// state of the channel is sent to the utxoNursery in order to ultimatley sweep
// the immature outputs.
func (p *peer) executeForceClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) {
// Execute a unilateral close shutting down all further channel
// operation.
closeSummary, err := channel.ForceClose()
if err != nil {
return nil, err
}
channel := p.activeChannels[key]

closeTx := closeSummary.CloseTx
txid := closeTx.TxSha()

// With the close transaction in hand, broadcast the transaction to the
// network, thereby entering the psot channel resolution state.
peerLog.Infof("Broadcasting force close transaction: %v",
channel.ChannelPoint(), newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil {
return nil, err
}

// Send the closed channel sumary over to the utxoNursery in order to
// have its outputs sweeped back into the wallet once they're mature.
p.server.utxoNursery.incubateOutputs(closeSummary)

return &txid, nil
}

// executeCooperativeClose executes the initial phase of a user-executed
// cooperative channel close. The channel state machine is transitioned to the
// closing phase, then our half of the closing witness is sent over to the
// remote peer.
func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*wire.ShaHash, error) {
// Shift the channel state machine into a 'closing' state. This
// generates a signature for the closing tx, as well as a txid of the
// closing tx itself, allowing us to watch the network to determine
// when the remote node broadcasts the fully signed closing transaction.
// when the remote node broadcasts the fully signed closing
// transaction.
sig, txid, err := channel.InitCooperativeClose()
if err != nil {
req.err <- err
return
return nil, err
}

chanPoint := channel.ChannelPoint()
peerLog.Infof("Executing cooperative closure of "+
"ChanPoint(%v) with peerID(%v), txid=%v", key, p.id,
txid)
"ChanPoint(%v) with peerID(%v), txid=%v", chanPoint, p.id, txid)

// With our signature for the close tx generated, send the signature
// to the remote peer instructing it to close this particular channel
// With our signature for the close tx generated, send the signature to
// the remote peer instructing it to close this particular channel
// point.
// TODO(roasbeef): remove encoding redundancy
closeSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil {
req.err <- err
return
return nil, err
}
closeReq := lnwire.NewCloseRequest(chanPoint, closeSig)
p.queueMsg(closeReq, nil)

return txid, nil
}

// handleLocalClose kicks-off the workflow to execute a cooperative or forced
// unilateral closure of the channel initiated by a local sub-system.
func (p *peer) handleLocalClose(req *closeLinkReq) {
var (
err error
closingTxid *wire.ShaHash
)

channel := p.activeChannels[*req.chanPoint]

if req.forceClose {
closingTxid, err = p.executeForceClose(channel)
peerLog.Infof("Force closing ChannelPoint(%v) with txid: %v",
req.chanPoint, closingTxid)
} else {
closingTxid, err = p.executeCooperativeClose(channel)
peerLog.Infof("Attempting cooperative close of "+
"ChannelPoint(%v) with txid: %v", req.chanPoint,
closingTxid)
}
if err != nil {
req.err <- err
return
}

// Update the caller w.r.t the current pending state of this request.
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{
Txid: txid[:],
Txid: closingTxid[:],
},
},
}
Expand All @@ -673,7 +729,8 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// confirmation.
go func() {
// TODO(roasbeef): add param for num needed confs
confNtfn, err := p.server.chainNotifier.RegisterConfirmationsNtfn(txid, 1)
notifier := p.server.chainNotifier
confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxid, 1)
if err != nil {
req.err <- err
return
Expand All @@ -692,7 +749,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// The channel has been closed, remove it from any
// active indexes, and the database state.
peerLog.Infof("ChannelPoint(%v) is now "+
"closed at height %v", key, height)
"closed at height %v", req.chanPoint, height)
if err := wipeChannel(p, channel); err != nil {
req.err <- err
return
Expand All @@ -706,7 +763,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: txid[:],
ClosingTxid: closingTxid[:],
Success: true,
},
},
Expand Down Expand Up @@ -867,6 +924,19 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
out:
for {
select {
case <-channel.UnilateralCloseSignal:
peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain",
state.chanPoint)
if err := wipeChannel(p, channel); err != nil {
peerLog.Errorf("Unable to wipe channel %v", err)
}
break out
case <-channel.ForceCloseSignal:
peerLog.Warnf("ChannelPoint(%v) has been force "+
"closed, disconnecting from peerID(%x)",
state.chanPoint, p.id)
break out
//p.Disconnect()
// TODO(roasbeef): prevent leaking ticker?
case <-state.logCommitTimer:
// If we haven't sent or received a new commitment
Expand Down
15 changes: 8 additions & 7 deletions rpcserver.go
Expand Up @@ -246,6 +246,7 @@ out:
func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error {

force := in.Force
index := in.ChannelPoint.OutputIndex
txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid)
if err != nil {
Expand All @@ -257,7 +258,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)",
targetChannelPoint)

updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint)
updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint, force)

out:
for {
Expand All @@ -279,7 +280,7 @@ out:
switch closeUpdate := closingUpdate.Update.(type) {
case *lnrpc.CloseStatusUpdate_ChanClose:
h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid)
rpcsLog.Errorf("[closechannel] close completed: "+
rpcsLog.Infof("[closechannel] close completed: "+
"txid(%v)", h)
break out
}
Expand Down Expand Up @@ -492,11 +493,11 @@ func (r *rpcServer) ShowRoutingTable(ctx context.Context,
for _, channel := range rtCopy.AllChannels() {
channels = append(channels,
&lnrpc.RoutingTableLink{
Id1: channel.Id1.String(),
Id2: channel.Id2.String(),
Outpoint: channel.EdgeID.String(),
Capacity: channel.Info.Capacity(),
Weight: channel.Info.Weight(),
Id1: channel.Id1.String(),
Id2: channel.Id2.String(),
Outpoint: channel.EdgeID.String(),
Capacity: channel.Info.Capacity(),
Weight: channel.Info.Weight(),
},
)
}
Expand Down

0 comments on commit 80b09f7

Please sign in to comment.