Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoclose channels that have not been reestablished for a long time #196

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 119 additions & 0 deletions automation/automation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package automation

import (
"context"
"time"

"github.com/decred/dcrlnd/channeldb"
"github.com/decred/dcrlnd/lncfg"
"github.com/decred/dcrlnd/lnrpc"
)

// Config are the config parameters for the automation server.
type Config struct {
*lncfg.Automation

// CloseChannel should be set to the rpc server function that allows
// closing a channel.
CloseChannel func(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error

DB *channeldb.DB
}

// Server is set of automation services for dcrlnd nodes.
type Server struct {
cfg *Config
ctx context.Context
cancelCtx func()
}

// NewServer creates a new automation server.
func NewServer(cfg *Config) *Server {
ctx, cancel := context.WithCancel(context.Background())
s := &Server{
cfg: cfg,
ctx: ctx,
cancelCtx: cancel,
}
return s
}

// runForceCloseStaleChanReestablish autocloses channels where a remote peer
// has been online without sending ChannelReestablish messages.
func (s *Server) runForceCloseStaleChanReestablish() {
// Use a default ticker for 1 hour, but reduce if the threshold is lower
// than that (useful for tests).
forceCloseInterval := time.Duration(s.cfg.ForceCloseChanReestablishWait) * time.Second
tickerInterval := time.Hour
if forceCloseInterval < tickerInterval {
tickerInterval = forceCloseInterval
}
log.Debugf("Performing force close check for stale channels based on "+
"ChannelReestablish every %s", tickerInterval)

ticker := time.NewTicker(tickerInterval)
for {
select {
case <-s.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
}

log.Debugf("Time to check channels for force close due to stale " +
"chan reestablish messages")

chans, err := s.cfg.DB.FetchAllOpenChannels()
if err != nil {
log.Errorf("Unable to list open channels: %v", err)
continue
}

for _, c := range chans {
sid := c.ShortChannelID
waitTime, err := s.cfg.DB.GetChanReestablishWaitTime(sid)
if err != nil {
log.Errorf("Unable to get chan reestablish msg "+
"times for %s: %v", sid, err)
continue
}

if waitTime < forceCloseInterval {
log.Tracef("Skipping autoclose of %s due to low "+
"wait time %s", sid, waitTime)
continue
}

// Start force close.
chanPoint := c.FundingOutpoint
log.Infof("Starting force-close attempt of channel %s (%s) "+
"due to channel reestablish msg wait time %s greater "+
"than max interval %s", chanPoint,
sid, waitTime, forceCloseInterval)
go func() {
req := &lnrpc.CloseChannelRequest{
ChannelPoint: lnrpc.OutpointToChanPoint(&chanPoint),
Force: true,
}
err = s.cfg.CloseChannel(req, nil)
if err != nil {
log.Errorf("Unable to force-close channel %s: %v",
sid, err)
}
}()
}
}
}

func (s *Server) Start() error {
if s.cfg.ForceCloseChanReestablishWait > 0 {
go s.runForceCloseStaleChanReestablish()
}
return nil
}

func (s *Server) Stop() error {
s.cancelCtx()
return nil
}
29 changes: 29 additions & 0 deletions automation/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package automation

import (
"github.com/decred/dcrlnd/build"
"github.com/decred/slog"
)

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log slog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("AUTO", nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(slog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using slog.
func UseLogger(logger slog.Logger) {
log = logger
}
100 changes: 100 additions & 0 deletions channeldb/channel_reestablish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package channeldb

import (
"time"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/decred/dcrlnd/kvdb"
"github.com/decred/dcrlnd/lnwire"
)

var (
// chanReestablishWaitTimeBucket is a bucket that stores the total
// elapsed time that has been spent waiting for a channel reestablish
// message while a peer has been online.
//
// Keys are ShortChannelID, values are time.Duration.
chanReestablishWaitTimeBucket = []byte("chan-reestablish-wait-time")
)

// shortChanIDToBytes encodes a short channel id as a byte slice.
func shortChanIDToBytes(s lnwire.ShortChannelID) []byte {
var b [8]byte
byteOrder.PutUint64(b[:], s.ToUint64())
return b[:]
}

// readChanReestablishWaitTime decodes a value from the
// chanReestablishWaitTimeBucket bucket.
func readChanReestablishWaitTime(v []byte) time.Duration {
if len(v) < 8 {
return 0
}
return time.Duration(byteOrder.Uint64(v))
}

// putChanReestablishWaitTime stores a value in the chanReestablishWaitTimeBucket.
func putChanReestablishWaitTime(bucket walletdb.ReadWriteBucket, key []byte, v time.Duration) error {
var b [8]byte
byteOrder.PutUint64(b[:], uint64(v))
return bucket.Put(key, b[:])
}

// AddToChanReestablishWaitTime adds the passed waitTime interval to the
// total time that a peer has been online without reestablishing the passed
// channel.
func (d *DB) AddToChanReestablishWaitTime(chanID lnwire.ShortChannelID, waitTime time.Duration) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(chanReestablishWaitTimeBucket)
if bucket == nil {
var err error
bucket, err = tx.CreateTopLevelBucket(chanReestablishWaitTimeBucket)
if err != nil {
return err
}
}

// Read the existing wait time from the DB.
key := shortChanIDToBytes(chanID)

// Store current+additional wait time.
current := readChanReestablishWaitTime(bucket.Get(key))
newWaitTime := current + waitTime
return putChanReestablishWaitTime(bucket, key, newWaitTime)
}, func() {})
}

// ResetChanReestablishWaitTime zeros the time that a peer has spent online
// while not reestablishing a channel. This is called when a channel has been
// successfully reestablished.
func (d *DB) ResetChanReestablishWaitTime(chanID lnwire.ShortChannelID) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
var err error
bucket := tx.ReadWriteBucket(chanReestablishWaitTimeBucket)
if bucket == nil {
bucket, err = tx.CreateTopLevelBucket(chanReestablishWaitTimeBucket)
if err != nil {
return err
}
}

key := shortChanIDToBytes(chanID)
return putChanReestablishWaitTime(bucket, key, 0)
}, func() {})
}

// GetChanReestablishWaitTime returns the total time (across all connections)
// that a peer has been online while NOT reestablishing a channel.
func (d *DB) GetChanReestablishWaitTime(chanID lnwire.ShortChannelID) (waitTime time.Duration, err error) {
err = kvdb.View(d, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(chanReestablishWaitTimeBucket)
if bucket == nil {
return nil
}

key := shortChanIDToBytes(chanID)
waitTime = readChanReestablishWaitTime(bucket.Get(key))
return nil
}, func() {})
return
}
1 change: 1 addition & 0 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (d *DB) Path() string {
var topLevelBuckets = [][]byte{
openChannelBucket,
closedChannelBucket,
chanReestablishWaitTimeBucket, // DCR only
forwardingLogBucket,
fwdPackagesKey,
invoiceBucket,
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ type Config struct {

Autopilot *lncfg.AutoPilot `group:"Autopilot" namespace:"autopilot"`

Automation *lncfg.Automation `group:"Automation" namespace:"automation"`

Tor *lncfg.Tor `group:"Tor" namespace:"tor"`

SubRPCServers *subRPCServerConfigs `group:"subrpc"`
Expand Down Expand Up @@ -439,6 +441,9 @@ func DefaultConfig() Config {
"top_centrality": 1.0,
},
},
Automation: &lncfg.Automation{
ForceCloseChanReestablishWait: 60 * 60 * 24 * 3, // 3 days
},
PaymentsExpirationGracePeriod: defaultPaymentsExpirationGracePeriod,
TrickleDelay: defaultTrickleDelay,
ChanStatusSampleInterval: defaultChanStatusSampleInterval,
Expand Down
22 changes: 22 additions & 0 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ func (l *channelLink) syncChanStates() error {
"ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
}

// Track how long the peer is online while NOT sending a
// ChannelReestablish.
sendTime := time.Now()

var msgsToReSend []lnwire.Message

// Next, we'll wait indefinitely to receive the ChanSync message. The
Expand Down Expand Up @@ -740,7 +744,25 @@ func (l *channelLink) syncChanStates() error {
l.cfg.Peer.SendMessage(false, msg)
}

// Channel reestablish successfully received, so reset the wait
// time.
err := l.cfg.Switch.cfg.DB.ResetChanReestablishWaitTime(l.shortChanID)
if err != nil {
l.log.Errorf("Unable to reset ChannelReestblish wait "+
"time: %v", err)
}

case <-l.quit:
// Channel reestablish not received while peer was online. Track
// how long we waited for a ChannelReestasblish message.
waitTime := time.Since(sendTime)
l.log.Debugf("Adding +%s to channel reestablish wait time", waitTime)
err := l.cfg.Switch.cfg.DB.AddToChanReestablishWaitTime(l.shortChanID, waitTime)
if err != nil {
l.log.Errorf("Unable to track wait time for "+
"ChannelReestblish msg: %v", err)
}

return ErrLinkShuttingDown
}

Expand Down
9 changes: 9 additions & 0 deletions lncfg/automation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package lncfg

// Automation holds some server level automation config options.
type Automation struct {
// ForceCloseChanReestablishWait is the time after which the automation
// server force closes a channel where the local peer has sent
// ChannelReestablish messages but the remote peer does not.
ForceCloseChanReestablishWait int64 `long:"closechanreestablishwait" description:"Force close a channel if the difference between time channel reestablish msgs were sent and received is higher than the specified one"`
}
12 changes: 12 additions & 0 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"gopkg.in/macaroon.v2"

"github.com/decred/dcrd/txscript/v4/stdaddr"
"github.com/decred/dcrlnd/automation"
"github.com/decred/dcrlnd/autopilot"
"github.com/decred/dcrlnd/build"
"github.com/decred/dcrlnd/cert"
Expand Down Expand Up @@ -988,6 +989,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
defer tower.Stop()
}

// Start the dcrlnd specific automation services.
autoServer := automation.NewServer(&automation.Config{
Automation: cfg.Automation,
CloseChannel: rpcServer.CloseChannel,
DB: remoteChanDB,
})
if err := autoServer.Start(); err != nil {
return err
}
defer autoServer.Stop()

// Wait for shutdown signal from either a graceful server stop or from
// the interrupt handler.
<-interceptor.ShutdownChannel()
Expand Down