Skip to content

Commit

Permalink
plugin: add rpc method lnmetrics-force-update
Browse files Browse the repository at this point in the history
Changelog-Added: add rpc method `lnmetrics-force-update`

Signed-off-by: Vincenzo Palazzo <vincenzopalazzodev@gmail.com>
  • Loading branch information
vincenzopalazzo committed Apr 2, 2022
1 parent 5c76377 commit 91499c4
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 14 deletions.
32 changes: 32 additions & 0 deletions internal/plugin/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,35 @@ func (instance *MetricOneRpcMethod) Call() (jrpc2.Result, error) {

return nil, fmt.Errorf("We don't support the filter operation right now")
}

// ForceUpdateRPC enable the force update command
type ForceUpdateRPC struct {
// the instance of the plugin
plugin *MetricsPlugin
}

func NewForceUpdateRPC(plugin *MetricsPlugin) *ForceUpdateRPC {
return &ForceUpdateRPC{plugin}
}

func (instance *ForceUpdateRPC) New() interface{} {
return instance
}

func (instance *ForceUpdateRPC) Name() string {
return "lnmetrics-force-update"
}

func (instance *ForceUpdateRPC) Call() (jrpc2.Result, error) {
for _, metric := range instance.plugin.Metrics {
msg := Msg{
cmd: "plugin_rpc_method",
params: map[string]interface{}{"event": "on_force_update"},
}
instance.plugin.callUpdateOnMetric(metric, &msg)
}
response := struct {
result string
}{result: "force call update on all the metrics succeeded"}
return response, nil
}
31 changes: 22 additions & 9 deletions internal/plugin/metrics_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,19 @@ func (instance *MetricOne) Update(lightning *glightning.Lightning) error {

func (instance *MetricOne) UpdateWithMsg(message *Msg,
lightning *glightning.Lightning) error {
return fmt.Errorf("method not supported")
if event, ok := message.params["event"]; ok {
status, err := instance.onEvent(fmt.Sprintf("%s", event), lightning)
if err != nil {
return err
}
instance.UpTime = append(instance.UpTime, status)
instance.lastCheck = time.Now().Unix()
if status.Timestamp > 0 {
instance.lastCheck = status.Timestamp
}
return instance.MakePersistent()
}
return nil
}

func (instance *MetricOne) MakePersistent() error {
Expand Down Expand Up @@ -569,12 +581,13 @@ func (instance *MetricOne) UploadOnRepo(client *graphql.Client, lightning *gligh
instance.UpTime = make([]*status, 0)
instance.ChannelsInfo = make(map[string]*statusChannel)

// Refactored this method in a utils functions
// Refactored this method in an utils functions
t := time.Now()
log.GetInstance().Info(fmt.Sprintf("Metric One Upload at %s", t.Format(time.RFC850)))
return nil
}

// checkChannelInCache check if a node with channel_id is inside the gossip map or in the cache
func (instance *MetricOne) checkChannelInCache(lightning *glightning.Lightning, channelID string) (*cache.NodeInfoCache, error) {
var nodeInfo cache.NodeInfoCache
inCache := false
Expand All @@ -595,7 +608,6 @@ func (instance *MetricOne) checkChannelInCache(lightning *glightning.Lightning,
node, err := lightning.GetNode(channelID)
if err != nil {
log.GetInstance().Error(fmt.Sprintf("Error in command listNodes in makeChannelsSummary: %s", err))
// We admit this error, a node can be forgotten by the gossip if it is offline for long time.
return nil, err
}
nodeInfo = cache.NodeInfoCache{
Expand All @@ -611,7 +623,7 @@ func (instance *MetricOne) checkChannelInCache(lightning *glightning.Lightning,
return &nodeInfo, nil
}

// Make a summary of all the channels information that the node have a channels with.
// makeChannelsSummary Make a summary of all the channels information that the node have a channels with.
func (instance *MetricOne) makeChannelsSummary(lightning *glightning.Lightning, channels []*glightning.FundingChannel) (*ChannelsSummary, error) {
channelsSummary := &ChannelsSummary{
TotChannels: 0,
Expand All @@ -621,7 +633,6 @@ func (instance *MetricOne) makeChannelsSummary(lightning *glightning.Lightning,
if len(channels) > 0 {
summary := make([]*ChannelSummary, 0)
for _, channel := range channels {

if channel.State == "ONCHAIN" {
// When the channel is on chain, it is not longer a channel,
// it stay in the listfunds for 100 block (bitcoin time) after the closing commitment
Expand All @@ -637,9 +648,9 @@ func (instance *MetricOne) makeChannelsSummary(lightning *glightning.Lightning,

nodeInfo, err := instance.checkChannelInCache(lightning, channel.Id)
if err != nil {
// THIS should never happen, because can corrupt
// the metrics
return nil, err
// the node is not in the cache and in the gossip map
// skip this should be fine too
continue
}
channelsSummary.TotChannels++
channelSummary.Alias = nodeInfo.Alias
Expand Down Expand Up @@ -676,7 +687,6 @@ func (instance *MetricOne) makePaymentsSummary(lightning *glightning.Lightning,
func (instance *MetricOne) collectInfoChannels(lightning *glightning.Lightning, channels []*glightning.FundingChannel, event string) error {
cache := make(map[string]bool)
for _, channel := range channels {

switch channel.State {
// state of a channel where there is any type of communication yet
// we skip this type of state
Expand Down Expand Up @@ -960,6 +970,9 @@ func (instance *MetricOne) getChannelInfo(lightning *glightning.Lightning,

//FIXME put inside the utils functions
func getMSatValue(msatStr string) int64 {
if !strings.Contains(msatStr, "msat") {
return -1
}
msatTokens := strings.Split(msatStr, "msat")
if len(msatTokens) == 0 {
return -1
Expand Down
17 changes: 12 additions & 5 deletions internal/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/robfig/cron/v3"
cron "github.com/robfig/cron/v3"
"github.com/vincenzopalazzo/glightning/glightning"

"github.com/LNOpenMetrics/go-lnmetrics.reporter/internal/db"
Expand Down Expand Up @@ -75,25 +75,32 @@ func (plugin *MetricsPlugin) RegisterMethods() error {
if err := plugin.Plugin.RegisterMethod(cacheRPCMethod); err != nil {
return err
}

forceUpdate := NewForceUpdateRPC(plugin)
forceUpdateRPC := glightning.NewRpcMethod(forceUpdate, "call the update on all the plugin")
forceUpdateRPC.Category = "lnmetrics"
if err := plugin.Plugin.RegisterMethod(forceUpdateRPC); err != nil {
return err
}
return nil
}

//nolint
func (plugin *MetricsPlugin) callUpdateOnMetric(metric Metric, msg *Msg) {
if err := metric.UpdateWithMsg(msg, plugin.Rpc); err != nil {
log.GetInstance().Error(fmt.Sprintf("Error during update metrics event: %s", err))
log.GetInstance().Errorf("Error during update metrics event: %s", err)
}
}

// Call on stop operation on the node when the caller are shoutdown it self.
// callOnStopOnMetrics Call on stop operation on the node when the caller are shutdown itself.
func (plugin *MetricsPlugin) callOnStopOnMetrics(metric Metric, msg *Msg) {
err := metric.OnStop(msg, plugin.Rpc)
if err != nil {
log.GetInstance().Error(err)
}
}

// Update the metrics without any information received by the caller
// callUpdateOnMetricNoMsg Update the metrics without any information received by the caller
func (plugin *MetricsPlugin) callUpdateOnMetricNoMsg(metric Metric) {
log.GetInstance().Debug("Calling Update on metrics")
err := metric.Update(plugin.Rpc)
Expand All @@ -110,7 +117,7 @@ func (plugin *MetricsPlugin) updateAndUploadMetric(metric Metric) {
}
}

// Register internal recurrent methods
// RegisterRecurrentEvt Register internal recurrent methods
func (plugin *MetricsPlugin) RegisterRecurrentEvt(after string) {
log.GetInstance().Info(fmt.Sprintf("Register recurrent event each %s", after))
plugin.Cron = cron.New()
Expand Down

0 comments on commit 91499c4

Please sign in to comment.