From d99abce1f3741207399b06922f9ab0f9fde5a767 Mon Sep 17 00:00:00 2001 From: Vincenzo Palazzo Date: Wed, 30 Mar 2022 01:01:50 +0200 Subject: [PATCH] plugin: introduce the cache system to speed up the plugin with big node Changelog-Fixed: introduce the cache system to speed up the plugin with big node Signed-off-by: Vincenzo Palazzo --- .gitignore | 2 + go.mod | 2 +- go.sum | 2 + internal/cache/cache_manager.go | 124 ++++++++++++++++++++++++++++++++ internal/cache/cache_model.go | 16 +++++ internal/plugin/metrics_one.go | 96 +++++++++++++++++-------- 6 files changed, 211 insertions(+), 31 deletions(-) create mode 100644 internal/cache/cache_manager.go create mode 100644 internal/cache/cache_model.go diff --git a/.gitignore b/.gitignore index 7eaed55..d99e925 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ vendor go-lnmetrics *.out + +.idea/ diff --git a/go.mod b/go.mod index 4481ef1..dc838fb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/LNOpenMetrics/go-lnmetrics.reporter go 1.17 require ( - github.com/LNOpenMetrics/lnmetrics.utils v0.0.3 + github.com/LNOpenMetrics/lnmetrics.utils v0.0.6 github.com/elastic/go-sysinfo v1.7.1 github.com/kinbiko/jsonassert v1.0.2 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index d3ca26a..6813343 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/LNOpenMetrics/lnmetrics.utils v0.0.2 h1:USJE6C1dhiNUjiQ29iQ3MHxxZt9Y0 github.com/LNOpenMetrics/lnmetrics.utils v0.0.2/go.mod h1:6PC0XEUljl08AHdHdMQtR2tGJ1o9Jzg8yt7nioTnlE4= github.com/LNOpenMetrics/lnmetrics.utils v0.0.3 h1:emd43at2elU8FDAeJhTDNkMF1/cbYR0lFS0iyka34Go= github.com/LNOpenMetrics/lnmetrics.utils v0.0.3/go.mod h1:6PC0XEUljl08AHdHdMQtR2tGJ1o9Jzg8yt7nioTnlE4= +github.com/LNOpenMetrics/lnmetrics.utils v0.0.6 h1:fO6nTbMsLzJDEHRRfyS+QUSEzmyXHCaOWOpmttlYGrE= +github.com/LNOpenMetrics/lnmetrics.utils v0.0.6/go.mod h1:6PC0XEUljl08AHdHdMQtR2tGJ1o9Jzg8yt7nioTnlE4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/cache/cache_manager.go b/internal/cache/cache_manager.go new file mode 100644 index 0000000..aa5354e --- /dev/null +++ b/internal/cache/cache_manager.go @@ -0,0 +1,124 @@ +package cache + +import ( + "encoding/json" + "fmt" + db "github.com/LNOpenMetrics/lnmetrics.utils/db/leveldb" + "github.com/LNOpenMetrics/lnmetrics.utils/log" + "strings" +) + +// cacheManager is the internal rappresentation +// of the cache manager, that contains all the utils +// function to manage the cache +type cacheManager struct { + prefix string + cacheIdx string + cache map[string]*string +} + +func GetInstance() *cacheManager { + return &cacheManager{ + prefix: "cache", + cacheIdx: "cache/idx", + cache: nil, + } +} + +func (instance *cacheManager) buildID(key string) string { + return strings.Join([]string{instance.prefix, key}, "/") +} + +func (instance *cacheManager) initCache() error { + instance.cache = make(map[string]*string) + arrJson, err := json.Marshal(instance.cache) + if err != nil { + return err + } + if err := db.GetInstance().PutValueInBytes(instance.cacheIdx, arrJson); err != nil { + log.GetInstance().Errorf("%s", err) + return err + } + return nil +} + +func (instance *cacheManager) getCacheIndex() (map[string]*string, error) { + if instance.cache == nil { + if err := instance.initCache(); err != nil { + return nil, err + } + return instance.getCacheIndex() + } + value, err := db.GetInstance().GetValueInBytes(instance.cacheIdx) + if err != nil { + return nil, err + } + if err := json.Unmarshal(value, &instance.cache); err != nil { + return nil, err + } + return instance.cache, nil +} + +func (instance *cacheManager) addToCache(key string) error { + idx := instance.cache + if idx == nil { + tmpIdx, err := instance.getCacheIndex() + if err != nil { + return err + } + idx = tmpIdx + } + if _, ok := idx[key]; !ok { + idx[key] = &key + } + return nil +} + +func (instance *cacheManager) IsInCache(key string) bool { + if instance.cache != nil { + _, ok := instance.cache[key] + if ok { + return ok + } + // otherwise, continue and check with the database + } + key = instance.buildID(key) + _, err := db.GetInstance().GetValue(key) + if err != nil { + log.GetInstance().Errorf("Error inside the cache: %s", err) + return false + } + return true +} + +// GetFromCache retrieval the information that are in the cache in bytes +func (instance *cacheManager) GetFromCache(key string) ([]byte, error) { + if instance.IsInCache(key) { + key = instance.buildID(key) + return db.GetInstance().GetValueInBytes(key) + } + return nil, fmt.Errorf("no value with key %s in the cache", key) +} + +// PutToCache put the value in the json form inside the cache with the key specified +func (instance *cacheManager) PutToCache(key string, value interface{}) error { + jsonValue, err := json.Marshal(value) + if err != nil { + return err + } + key = instance.buildID(key) + if err := db.GetInstance().PutValueInBytes(key, jsonValue); err != nil { + return err + } + return instance.addToCache(key) +} + +// CleanCache clean the index from the database +func (instance *cacheManager) CleanCache() error { + for key := range instance.cache { + if err := db.GetInstance().DeleteValue(key); err != nil { + return err + } + } + return nil +} diff --git a/internal/cache/cache_model.go b/internal/cache/cache_model.go new file mode 100644 index 0000000..8f73ad8 --- /dev/null +++ b/internal/cache/cache_model.go @@ -0,0 +1,16 @@ +// Package cache package implement the basic block to implement the +// cache persistence +package cache + +import ( + "github.com/vincenzopalazzo/glightning/glightning" +) + +// NodeInfoCache implement the interface to +// store the node information inside the cache. +type NodeInfoCache struct { + ID string + Alias string + Color string + Features *glightning.Hexed +} diff --git a/internal/plugin/metrics_one.go b/internal/plugin/metrics_one.go index 0b0ad7d..55f1698 100644 --- a/internal/plugin/metrics_one.go +++ b/internal/plugin/metrics_one.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/LNOpenMetrics/go-lnmetrics.reporter/internal/cache" "reflect" "strconv" "strings" @@ -148,7 +149,7 @@ type PaymentsSummary struct { Failed uint64 `json:"failed"` } -// Contains the info about the ln node. +// NodeInfo Contains the info about the ln node. type NodeInfo struct { Implementation string `json:"implementation"` Version string `json:"version"` @@ -160,7 +161,7 @@ type NodeAddress struct { Port uint `json:"port"` } -// Main data structure that it is filled by the collection data phase. +// MetricOne Main data structure that it is filled by the collection data phase. type MetricOne struct { // Internal id to identify the metric id int `json:"-"` @@ -211,7 +212,7 @@ type MetricOne struct { Storage db.PluginDatabase `json:"-"` } -func (m MetricOne) MarshalJSON() ([]byte, error) { +func (instance MetricOne) MarshalJSON() ([]byte, error) { // Declare a new type using the definition of MetricOne, // the result of this is that M will have the same structure // as MetricOne but none of its methods (this avoids recursive @@ -232,8 +233,8 @@ func (m MetricOne) MarshalJSON() ([]byte, error) { } // move map elements to slice - channels := make([]*statusChannel, 0, len(m.ChannelsInfo)) - for _, channel := range m.ChannelsInfo { + channels := make([]*statusChannel, 0, len(instance.ChannelsInfo)) + for _, channel := range instance.ChannelsInfo { channels = append(channels, channel) } @@ -241,12 +242,12 @@ func (m MetricOne) MarshalJSON() ([]byte, error) { // For the embedded M field use a converted instance of the receiver. // For the ChannelsInfo field use the channels slice. return json.Marshal(T{ - M: M(m), + M: M(instance), ChannelsInfo: channels, }) } -// Same as MarshalJSON but in reverse. +// UnmarshalJSON Same as MarshalJSON but in reverse. func (instance *MetricOne) UnmarshalJSON(data []byte) error { var jsonMap map[string]interface{} if err := json.Unmarshal(data, &jsonMap); err != nil { @@ -280,7 +281,7 @@ func (instance *MetricOne) UnmarshalJSON(data []byte) error { return nil } -// This method is required by the +// NewMetricOne This method is required by the interface func NewMetricOne(nodeId string, sysInfo sysinfo.HostInfo, storage db.PluginDatabase) *MetricOne { return &MetricOne{ id: 1, @@ -313,9 +314,9 @@ func (instance *MetricOne) MetricName() *string { } // Migrate from a payload format to another, with the help of the version number. -// Note that it is required implementing a required strategy only if the some -// properties will change during the time, if somethings it is only add, we -// don't have anythings to migrate. +// Note that it is required implementing a required strategy only if the same +// properties will change during the time, if something's it is only add, we +// don't have anything's to migrate. func (instance *MetricOne) Migrate(payload map[string]interface{}) error { version, found := payload["version"] @@ -339,7 +340,7 @@ func (instance *MetricOne) Migrate(payload map[string]interface{}) error { return nil } -// Generic Plugin callback that it is ran each time that the plugin need to recording a new event. +// Generic Plugin callback that it is run each time that the plugin need to recording a new event. func (instance *MetricOne) onEvent(nameEvent string, lightning *glightning.Lightning) (*status, error) { listFunds, err := lightning.ListFunds() if err != nil { @@ -399,7 +400,7 @@ func (instance *MetricOne) onEvent(nameEvent string, lightning *glightning.Light return status, nil } -// One time callback called from the lightning implementation +// OnInit One time callback called from the lightning implementation func (instance *MetricOne) OnInit(lightning *glightning.Lightning) error { getInfo, err := lightning.GetInfo() if err != nil { @@ -425,7 +426,7 @@ func (instance *MetricOne) OnInit(lightning *glightning.Lightning) error { instance.lastCheck = status.Timestamp } - //FIXME: We could use a set datastructure + //FIXME: We could use a set datastructures instance.Address = make([]*NodeAddress, 0) for _, address := range getInfo.Addresses { nodeAddress := &NodeAddress{ @@ -452,21 +453,21 @@ func (instance *MetricOne) Update(lightning *glightning.Lightning) error { return instance.MakePersistent() } -func (metric *MetricOne) UpdateWithMsg(message *Msg, +func (instance *MetricOne) UpdateWithMsg(message *Msg, lightning *glightning.Lightning) error { - return fmt.Errorf("Method not supported") + return fmt.Errorf("method not supported") } func (instance *MetricOne) MakePersistent() error { - json, err := instance.ToJSON() + instanceJson, err := instance.ToJSON() if err != nil { log.GetInstance().Error(fmt.Sprintf("JSON error %s", err)) return err } - return instance.Storage.StoreMetricOneSnapshot(instance.lastCheck, &json) + return instance.Storage.StoreMetricOneSnapshot(instance.lastCheck, &instanceJson) } -// here the message is not useful, but we keep it only for future evolution +// FIXME: the message is not useful, but we keep it only for future evolution // or we will remove it from here. func (instance *MetricOne) OnClose(msg *Msg, lightning *glightning.Lightning) error { log.GetInstance().Debug("On close event on metrics called") @@ -495,6 +496,7 @@ func (instance *MetricOne) OnClose(msg *Msg, lightning *glightning.Lightning) er return instance.MakePersistent() } +// ToJSON Convert the MetricOne structure to a JSON string. func (instance *MetricOne) ToJSON() (string, error) { json, err := json.Marshal(&instance) if err != nil { @@ -504,7 +506,7 @@ func (instance *MetricOne) ToJSON() (string, error) { return string(json), nil } -// Contact the server and make an init the node. +// InitOnRepo Contact the server and make an init the node. func (instance *MetricOne) InitOnRepo(client *graphql.Client, lightning *glightning.Lightning) error { log.GetInstance().Info("Init plugin on repository") err := client.GetNodeMetadata(instance.NodeID, instance.Network) @@ -546,7 +548,7 @@ func (instance *MetricOne) InitOnRepo(client *graphql.Client, lightning *glightn } } -// Contact the server and make an update request +// UploadOnRepo Contact the server and make an update request func (instance *MetricOne) UploadOnRepo(client *graphql.Client, lightning *glightning.Lightning) error { payload, err := instance.ToJSON() if err != nil { @@ -572,6 +574,41 @@ func (instance *MetricOne) UploadOnRepo(client *graphql.Client, lightning *gligh return nil } +func (instance *MetricOne) checkChannelInCache(lightning *glightning.Lightning, channelID string) (*cache.NodeInfoCache, error) { + var nodeInfo *cache.NodeInfoCache + if cache.GetInstance().IsInCache(channelID) { + bytes, err := cache.GetInstance().GetFromCache(channelID) + if err != nil { + log.GetInstance().Errorf("Error %s:", err) + return nil, err + } + if err := json.Unmarshal(bytes, nodeInfo); err != nil { + log.GetInstance().Errorf("Error %s", err) + return nil, err + } + } + + if nodeInfo == nil { + // FIXME: we need some method to update the cache and prefilled at the startup. + node, err := lightning.GetNode(channelID) + if err != nil { + log.GetInstance().Error(fmt.Sprintf("Error in command listNodes in makeChannelsSummary: %s", err)) + // We admit this error, a node can be forgotten by the gossip if it is offline for long time. + return nil, err + } + nodeInfo = &cache.NodeInfoCache{ + ID: node.Id, + Alias: node.Alias, + Color: node.Color, + Features: node.Features, + } + if err := cache.GetInstance().PutToCache(nodeInfo.ID, nodeInfo); err != nil { + log.GetInstance().Errorf("%s", err) + } + } + return nodeInfo, nil +} + // Make a summary of all the channels information that the node have a channels with. func (instance *MetricOne) makeChannelsSummary(lightning *glightning.Lightning, channels []*glightning.FundingChannel) (*ChannelsSummary, error) { channelsSummary := &ChannelsSummary{ @@ -595,17 +632,14 @@ func (instance *MetricOne) makeChannelsSummary(lightning *glightning.Lightning, ChannelId: channel.ShortChannelId, State: channel.State, } - // FIXME: With too many channels this can require to many node request! - // this can avoid to get all node node known, but this also can have a very big response. - node, err := lightning.GetNode(channel.Id) + + nodeInfo, err := instance.checkChannelInCache(lightning, channel.Id) if err != nil { - log.GetInstance().Error(fmt.Sprintf("Error in command listNodes in makeChannelsSummary: %s", err)) - // We admit this error, a node can be forgotten by the gossip if it is offline for long time. continue } channelsSummary.TotChannels++ - channelSummary.Alias = node.Alias - channelSummary.Color = node.Color + channelSummary.Alias = nodeInfo.Alias + channelSummary.Color = nodeInfo.Color summary = append(summary, channelSummary) } channelsSummary.Summary = summary @@ -813,7 +847,10 @@ func (instance *MetricOne) getChannelInfo(lightning *glightning.Lightning, } for _, subChannel := range subChannels { - nodeInfo, err := lightning.GetNode(channel.Id) + nodeInfo, err := instance.checkChannelInCache(lightning, channel.Id) + if err != nil { + continue + } // Init the default data here channelInfo := &ChannelInfo{ NodeId: channel.Id, @@ -914,7 +951,6 @@ func (instance *MetricOne) getChannelInfo(lightning *glightning.Lightning, } result[channelInfo.Direction] = channelInfo } - //TODO Adding support for the dual founding channels. return result, nil }