Skip to content

Commit

Permalink
adding multiple keys logic in analytics cache purger for slave gatewa…
Browse files Browse the repository at this point in the history
…ys (#3487)

<!-- Provide a general summary of your changes in the Title above -->

## Description
<!-- Describe your changes in detail -->
Extends #3483
Modified the logic for rpc_analytics_purger so it can read from keys from multiple keys. It's coded in full backward-compatible mode, which means that it's going to look for `tyk-system-analytics` key and then for `tyk-system-analytics_X `where X goes from 0 to 9.

After reading, it will send the analytics records to sink through RPC. If tyk-sink has enable_multiple_analytics_keys = true, it will write the records in multiple keys and if it's false, it will write them in only one key.

## Related Issue
<!-- This project only accepts pull requests related to open issues -->
<!-- If suggesting a new feature or change, please discuss it in an issue first -->
<!-- If fixing a bug, there should be an issue describing it with steps to reproduce -->
<!-- Please link to the issue here -->
https://tyktech.atlassian.net/browse/TT-1574
## Motivation and Context
<!-- Why is this change required? What problem does it solve? -->
Reduce Redis overload and have a better analytics distribution across the cluster for slave gateways.

## How This Has Been Tested
<!-- Please describe in detail how you tested your changes -->
<!-- Include details of your testing environment, and the tests you ran to see how your change affects other areas of
the code, etc. -->

## Screenshots (if appropriate)

## Types of changes
<!-- What types of changes does your code introduce? Put an `x` in all the boxes that apply: -->
- [ ] Bug fix (non-breaking change which fixes an issue)
- [X] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
- [ ] Refactoring or add test (improvements in base code or adds test coverage to functionality)

## Checklist
<!-- Go over all the following points, and put an `x` in all the boxes that apply -->
<!-- If you're unsure about any of these, don't hesitate to ask; we're here to help! -->
- [X] Make sure you are requesting to **pull a topic/feature/bugfix branch** (right side). If pulling from your own
      fork, don't request your `master`!
- [X] Make sure you are making a pull request against the **`master` branch** (left side). Also, you should start
      *your branch* off *our latest `master`*.
- [] My change requires a change to the documentation.
  - [ ] If you've changed APIs, describe what needs to be updated in the documentation.
  - [ ] If new config option added, ensure that it can be set via ENV variable
- [ ] I have updated the documentation accordingly.
- [ ] Modules and vendor dependencies have been updated; run `go mod tidy && go mod vendor`
- [ ] When updating library version must provide reason/explanation for this update.
- [ ] I have added tests to cover my changes.
- [X] All new and existing tests passed.
- [X] Check your code additions will not fail linting checks:
  - [X] `go fmt -s`
  - [X] `go vet`

(cherry picked from commit d0cf603)
  • Loading branch information
tbuchaillot authored and Tyk Bot committed Mar 11, 2021
1 parent 6333b8a commit e0e4f2e
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions rpc/rpc_analytics_purger.go
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"encoding/json"
"fmt"
"time"

msgpack "gopkg.in/vmihailenco/msgpack.v2"
Expand Down Expand Up @@ -55,7 +56,7 @@ type GeoData struct {
} `maxminddb:"location"`
}

const analyticsKeyName = "tyk-system-analytics"
const ANALYTICS_KEYNAME = "tyk-system-analytics"

// RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
Expand Down Expand Up @@ -111,31 +112,41 @@ func (r *Purger) PurgeCache() {
return
}

analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
return
}
keys := make([]interface{}, len(analyticsValues))

for i, v := range analyticsValues {
decoded := AnalyticsRecord{}
if err := msgpack.Unmarshal([]byte(v.(string)), &decoded); err != nil {
Log.WithError(err).Error("Couldn't unmarshal analytics data")
for i := -1; i < 10; i++ {
var analyticsKeyName string
if i == -1 {
//if it's the first iteration, we look for tyk-system-analytics to maintain backwards compatibility or if analytics_config.enable_multiple_analytics_keys is disabled in the gateway
analyticsKeyName = ANALYTICS_KEYNAME
} else {
Log.WithField("decoded", decoded).Debug("Decoded Record")
keys[i] = decoded
analyticsKeyName = fmt.Sprintf("%v_%v", ANALYTICS_KEYNAME, i)
}
}

data, err := json.Marshal(keys)
if err != nil {
Log.WithError(err).Error("Failed to marshal analytics data")
return
}
analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
continue
}
keys := make([]interface{}, len(analyticsValues))

for i, v := range analyticsValues {
decoded := AnalyticsRecord{}
if err := msgpack.Unmarshal([]byte(v.(string)), &decoded); err != nil {
Log.WithError(err).Error("Couldn't unmarshal analytics data")
} else {
Log.WithField("decoded", decoded).Debug("Decoded Record")
keys[i] = decoded
}
}

data, err := json.Marshal(keys)
if err != nil {
Log.WithError(err).Error("Failed to marshal analytics data")
return
}

// Send keys to RPC
if _, err := FuncClientSingleton("PurgeAnalyticsData", string(data)); err != nil {
EmitErrorEvent(FuncClientSingletonCall, "PurgeAnalyticsData", err)
Log.Warn("Failed to call purge, retrying: ", err)
// Send keys to RPC
if _, err := FuncClientSingleton("PurgeAnalyticsData", string(data)); err != nil {
EmitErrorEvent(FuncClientSingletonCall, "PurgeAnalyticsData", err)
Log.Warn("Failed to call purge, retrying: ", err)
}
}
}

0 comments on commit e0e4f2e

Please sign in to comment.