Skip to content

Commit

Permalink
Refactor to update state when cache or SLURM changes
Browse files Browse the repository at this point in the history
Before this commit the new stayrtr state was only recalculated
when the validated cache changed.

This commit refactors the update loop and separates the actual
update from pulling the updated cache. The update is triggered
when the SLURM file changes _or_ when the validated cache changes.

Should fix cloudflare/gortr#95 and make
further changes that trigger an update when an update in the cache
has expired easier.
  • Loading branch information
ties committed Aug 1, 2021
1 parent 74bd75c commit acbf96e
Showing 1 changed file with 66 additions and 51 deletions.
117 changes: 66 additions & 51 deletions cmd/stayrtr/stayrtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,41 +248,12 @@ func (e IdenticalFile) Error() string {
return fmt.Sprintf("File %s is identical to the previous version", e.File)
}

func (s *state) updateFile(file string) error {
// Update the state based on the current slurm file and data.
func (s *state) updateFromNewState() error {
sessid, _ := s.server.GetSessionId(nil)

log.Debugf("Refreshing cache from %s", file)

s.lastts = time.Now().UTC()
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
}
if code != -1 {
RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
}

hsum, _ := checkFile(data)
if s.lasthash != nil {
cres := bytes.Compare(s.lasthash, hsum)
if cres == 0 {
return IdenticalFile{File: file}
}
}

s.lastchange = time.Now().UTC()
s.lastdata = data

vrplistjson, err := decodeJSON(s.lastdata)
if err != nil {
return err
}

if s.checktime {
buildtime, err := time.Parse(time.RFC3339, vrplistjson.Metadata.Buildtime)
buildtime, err := time.Parse(time.RFC3339, s.lastdata.Metadata.Buildtime)
if err != nil {
return err
}
Expand All @@ -292,7 +263,7 @@ func (s *state) updateFile(file string) error {
}
}

vrpsjson := vrplistjson.Data
vrpsjson := s.lastdata.Data
if s.slurm != nil {
kept, removed := s.slurm.FilterOnVRPs(vrpsjson)
asserted := s.slurm.AssertVRPs()
Expand All @@ -301,13 +272,9 @@ func (s *state) updateFile(file string) error {
}

vrps, count, countv4, countv6 := processData(vrpsjson)
if err != nil {
return err
}

log.Infof("New update (%v uniques, %v total prefixes). %v bytes. Updating sha256 hash %x -> %x",
len(vrps), count, len(s.lastconverted), s.lasthash, hsum)
s.lasthash = hsum
log.Infof("New update (%v uniques, %v total prefixes). %v bytes.",
len(vrps), count)

s.server.AddVRPs(vrps)

Expand All @@ -322,7 +289,7 @@ func (s *state) updateFile(file string) error {
s.exported = prefixfile.VRPList{
Metadata: prefixfile.MetaData{
Counts: len(vrpsjson),
Buildtime: vrplistjson.Metadata.Buildtime,
Buildtime: s.lastdata.Metadata.Buildtime,
},
Data: vrpsjson,
}
Expand All @@ -339,16 +306,54 @@ func (s *state) updateFile(file string) error {
countv6_dup++
}
}
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, file)
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, *CacheBin)
}

return nil
}

func (s *state) updateSlurm(file string) error {
func (s *state) updateFile(file string) (bool, error) {
log.Debugf("Refreshing cache from %s", file)

s.lastts = time.Now().UTC()
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return false, err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
}
if code != -1 {
RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
}

hsum, _ := checkFile(data)
if s.lasthash != nil {
cres := bytes.Compare(s.lasthash, hsum)
if cres == 0 {
return false, IdenticalFile{File: file}
}
}

log.Infof("new cache file: Updating sha256 hash %x -> %x", s.lasthash, hsum)

vrplistjson, err := decodeJSON(data)
if err != nil {
return false, err
}

s.lasthash = hsum
s.lastchange = time.Now().UTC()
s.lastdata = vrplistjson

return true, nil
}

func (s *state) updateSlurm(file string) (bool, error) {
log.Debugf("Refreshing slurm from %v", file)
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return err
return false, err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
Expand All @@ -361,10 +366,10 @@ func (s *state) updateSlurm(file string) error {

slurm, err := prefixfile.DecodeJSONSlurm(buf)
if err != nil {
return err
return false, err
}
s.slurm = slurm
return nil
return true, nil
}

func (s *state) routineUpdate(file string, interval int, slurmFile string) {
Expand All @@ -379,8 +384,10 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
log.Debug("Received HUP signal")
}
delay.Stop()
slurmNotPresentOrUpdated := false
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
var err error
slurmNotPresentOrUpdated, err = s.updateSlurm(slurmFile)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -392,7 +399,7 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
}
}
}
err := s.updateFile(file)
cacheUpdated, err := s.updateFile(file)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -405,6 +412,15 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
log.Errorf("Error updating: %v", err)
}
}

// Only process the first time after there is either a cache or SLURM
// update.
if (cacheUpdated || slurmNotPresentOrUpdated) {
err := s.updateFromNewState()
if err != nil {
log.Errorf("Error updating from new state: %v", err)
}
}
}
}

Expand All @@ -417,8 +433,7 @@ func (s *state) exporter(wr http.ResponseWriter, r *http.Request) {
}

type state struct {
lastdata []byte
lastconverted []byte
lastdata *prefixfile.VRPList
lasthash []byte
lastchange time.Time
lastts time.Time
Expand Down Expand Up @@ -532,7 +547,7 @@ func main() {
log.Fatalf("Specify at least a bind address")
}

err := s.updateFile(*CacheBin)
_, err := s.updateFile(*CacheBin)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -548,7 +563,7 @@ func main() {

slurmFile := *Slurm
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
_, err := s.updateSlurm(slurmFile)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand Down

0 comments on commit acbf96e

Please sign in to comment.