Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TelemetryURL to SRV Record #374

Merged
merged 30 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3b09faa
Replace old simple docker images with latest rel/stable
Aug 27, 2019
c32745d
Add custom image defaulting to testnet
Aug 27, 2019
2ff04ed
Refreshable telemetry URL.
winder Sep 20, 2019
c8f711f
Make sure any queued up events will be flushed once the telemetry URL…
winder Sep 20, 2019
642ee79
Merge branch 'updateDockerHub' of https://github.com/Karmastic/go-alg…
winder Sep 20, 2019
a9c24a9
Fix test build.
winder Sep 20, 2019
7a8557e
Lookup 'telemetry.<dnsBootstrap>' to get the telemetry URL, and updat…
winder Sep 24, 2019
bf0083b
Make WebsocketNetwork.readFromBootstrap a public helper method.
winder Sep 24, 2019
0bd94c4
Print the heartbeat event on the first iteration.
winder Sep 27, 2019
78043f8
Enable SRV lookup in algoh, add periodic ready check to async event h…
winder Oct 4, 2019
466d0f7
Merge remote-tracking branch 'upstream/master' into will/srv
winder Oct 7, 2019
5014b1d
Minor cleanup
winder Oct 7, 2019
ffdcab1
No need to sanitize telemetry config, it's only saved during startup …
winder Oct 7, 2019
3065658
Fix import.
winder Oct 7, 2019
b468b00
Fix 'make sanity'
winder Oct 7, 2019
a220e4f
URL to URI.
winder Oct 7, 2019
773bed4
Fix make lint
winder Oct 7, 2019
dbd5820
Use a channel to trigger the logger events to drain instead of a ticker.
winder Oct 9, 2019
81e0694
Create SRV Update Service.
winder Oct 10, 2019
a86cdf7
Shutdown handling in telemetry URI update service.
winder Oct 10, 2019
324a5c5
Use separate services for telemetry and gossip SRV records.
winder Oct 11, 2019
53010fc
Fix make fmt and make lint
winder Oct 11, 2019
cab4f86
Make telemetry resilient to invalid uri's.
winder Oct 15, 2019
3275ea4
Merge remote-tracking branch 'upstream/master' into will/srv
winder Oct 15, 2019
0c85de6
This changed...
winder Oct 16, 2019
8fc3aa0
Rename method.
winder Oct 16, 2019
31d2d12
Fix telemetry tests.
winder Oct 16, 2019
7400a7f
Missing import
winder Oct 16, 2019
ed48ebb
Fix make lint
winder Oct 16, 2019
b9c440c
Use the correct errors package.
winder Oct 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/tools/network"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-algorand/util/tokens"
)
Expand Down Expand Up @@ -175,34 +176,6 @@ func main() {
if err != nil {
fmt.Fprintln(os.Stdout, "error creating telemetry hook", err)
}

if log.GetTelemetryEnabled() {
currentVersion := config.GetCurrentVersion()
startupDetails := telemetryspec.StartupEventDetails{
Version: currentVersion.String(),
CommitHash: currentVersion.CommitHash,
Branch: currentVersion.Branch,
Channel: currentVersion.Channel,
InstanceHash: crypto.Hash([]byte(absolutePath)).String(),
}

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.StartupEvent, startupDetails)
// Send a heartbeat event every 10 minutes as a sign of life
ticker := time.NewTicker(10 * time.Minute)
go func() {
values := make(map[string]string)
for {
metrics.DefaultRegistry().AddMetrics(values)

heartbeatDetails := telemetryspec.HeartbeatEventDetails{
Metrics: values,
}

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.HeartbeatEvent, heartbeatDetails)
<-ticker.C
}
}()
}
}
}

Expand Down Expand Up @@ -234,6 +207,7 @@ func main() {
}

// If overriding peers, disable SRV lookup
telemetryDNSBootstrapID := cfg.DNSBootstrapID
var peerOverrideArray []string
if *peerOverride != "" {
peerOverrideArray = strings.Split(*peerOverride, ";")
Expand Down Expand Up @@ -282,6 +256,59 @@ func main() {
}
fmt.Fprintf(os.Stdout, "Deadlock detection is set to: %s (Default state is '%s')\n", deadlockState, config.DefaultDeadlock)

if log.GetTelemetryEnabled() {
done := make(chan struct{})
defer close(done)

// Make a copy of config and reset DNSBootstrapID in case it was disabled.
cfgCopy := cfg
cfgCopy.DNSBootstrapID = telemetryDNSBootstrapID

// If the telemetry URI is not set, periodically check SRV records for new telemetry URI
if log.GetTelemetryURI() == "" {
network.StartTelemetryURIUpdateService(time.Minute, cfg, s.Genesis.Network, log, done)
}

currentVersion := config.GetCurrentVersion()
startupDetails := telemetryspec.StartupEventDetails{
Version: currentVersion.String(),
CommitHash: currentVersion.CommitHash,
Branch: currentVersion.Branch,
Channel: currentVersion.Channel,
InstanceHash: crypto.Hash([]byte(absolutePath)).String(),
}

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.StartupEvent, startupDetails)

// Send a heartbeat event every 10 minutes as a sign of life
go func() {
winder marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

sendHeartbeat := func() {
values := make(map[string]string)
metrics.DefaultRegistry().AddMetrics(values)

heartbeatDetails := telemetryspec.HeartbeatEventDetails{
Metrics: values,
}

log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.HeartbeatEvent, heartbeatDetails)
}

// Send initial heartbeat, followed by one every 10 minutes.
sendHeartbeat()
for {
select {
case <-ticker.C:
sendHeartbeat()
case <-done:
return
}
}
}()
}

s.Start()
}

Expand Down
41 changes: 28 additions & 13 deletions cmd/algoh/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/daemon/algod/api/client"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/nodecontrol"
"github.com/algorand/go-algorand/shared/algoh"
"github.com/algorand/go-algorand/tools/network"
"github.com/algorand/go-algorand/util"
)

Expand Down Expand Up @@ -65,7 +67,7 @@ func main() {
flag.Parse()
nc := getNodeController()

genesisID, err := nc.GetGenesisID()
genesis, err := nc.GetGenesis()
if err != nil {
fmt.Fprintln(os.Stdout, "error loading telemetry config", err)
return
Expand Down Expand Up @@ -100,8 +102,9 @@ func main() {
}
validateConfig(algohConfig)

done := make(chan struct{})
log := logging.Base()
configureLogging(genesisID, log, absolutePath)
configureLogging(genesis, log, absolutePath, done)
defer log.CloseTelemetry()

exeDir, err = util.ExeDir()
Expand All @@ -111,7 +114,6 @@ func main() {

var errorOutput stdCollector
var output stdCollector
done := make(chan struct{})
go func() {
args := make([]string, len(os.Args)-1)
copy(args, os.Args[1:]) // Copy our arguments (skip the executable)
Expand Down Expand Up @@ -249,7 +251,7 @@ func getNodeController() nodecontrol.NodeController {
return nc
}

func configureLogging(genesisID string, log logging.Logger, rootPath string) {
func configureLogging(genesis bookkeeping.Genesis, log logging.Logger, rootPath string, abort chan struct{}) {
log = logging.Base()

liveLog := fmt.Sprintf("%s/host.log", rootPath)
Expand All @@ -262,7 +264,7 @@ func configureLogging(genesisID string, log logging.Logger, rootPath string) {
log.SetJSONFormatter()
log.SetLevel(logging.Debug)

initTelemetry(genesisID, log, rootPath)
initTelemetry(genesis, log, rootPath, abort)

// if we have the telemetry enabled, we want to use it's sessionid as part of the
// collected metrics decorations.
Expand All @@ -271,12 +273,12 @@ func configureLogging(genesisID string, log logging.Logger, rootPath string) {
fmt.Fprintln(writer, "++++++++++++++++++++++++++++++++++++++++")
}

func initTelemetry(genesisID string, log logging.Logger, dataDirectory string) {
func initTelemetry(genesis bookkeeping.Genesis, log logging.Logger, dataDirectory string, abort chan struct{}) {
// Enable telemetry hook in daemon to send logs to cloud
// If ALGOTEST env variable is set, telemetry is disabled - allows disabling telemetry for tests
isTest := os.Getenv("ALGOTEST") != ""
if !isTest {
telemetryConfig, err := logging.EnsureTelemetryConfig(&dataDirectory, genesisID)
telemetryConfig, err := logging.EnsureTelemetryConfig(&dataDirectory, genesis.ID())
if err != nil {
fmt.Fprintln(os.Stdout, "error loading telemetry config", err)
return
Expand All @@ -291,13 +293,25 @@ func initTelemetry(genesisID string, log logging.Logger, dataDirectory string) {
fmt.Fprintln(os.Stdout, "error creating telemetry hook", err)
return
}
// For privacy concerns, we don't want to provide the full data directory to telemetry.
// But to be useful where multiple nodes are installed for convenience, we should be
// able to discriminate between instances with the last letter of the path.
if dataDirectory != "" {
dataDirectory = dataDirectory[len(dataDirectory)-1:]
}

if log.GetTelemetryEnabled() {
cfg, err := config.LoadConfigFromDisk(dataDirectory)
if err != nil && !os.IsNotExist(err) {
log.Fatalf("Cannot load config: %v", err)
}

// If the telemetry URI is not set, periodically check SRV records for new telemetry URI
if log.GetTelemetryURI() == "" {
network.StartTelemetryURIUpdateService(time.Minute, cfg, genesis.Network, log, abort)
}

// For privacy concerns, we don't want to provide the full data directory to telemetry.
// But to be useful where multiple nodes are installed for convenience, we should be
// able to discriminate between instances with the last letter of the path.
if dataDirectory != "" {
dataDirectory = dataDirectory[len(dataDirectory)-1:]
}

currentVersion := config.GetCurrentVersion()
startupDetails := telemetryspec.StartupEventDetails{
Version: currentVersion.String(),
Expand Down Expand Up @@ -360,3 +374,4 @@ func validateConfig(config algoh.HostConfig) {
reportErrorf("Config.DeadManTimeSec should be >= 30 seconds (set to %v)\n", config.DeadManTimeSec)
}
}

6 changes: 5 additions & 1 deletion libgoal/libgoal.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ func (c *Client) getAlgodClient() (algodclient.RestClient, error) {
}

func (c *Client) ensureGenesisID() (string, error) {
return c.nc.GetGenesisID()
genesis, err := c.nc.GetGenesis()
if err != nil {
return "", err
}
return genesis.ID(), nil
}

// GenesisID fetches the genesis ID for the running algod node
Expand Down
12 changes: 12 additions & 0 deletions logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type Logger interface {
AddHook(hook logrus.Hook)

EnableTelemetry(cfg TelemetryConfig) error
UpdateTelemetryURI(uri string)
GetTelemetryEnabled() bool
Metrics(category telemetryspec.Category, metrics telemetryspec.MetricDetails, details interface{})
Event(category telemetryspec.Category, identifier telemetryspec.Event)
Expand All @@ -162,6 +163,7 @@ type Logger interface {
GetTelemetrySession() string
GetTelemetryHostName() string
GetInstanceName() string
GetTelemetryURI() string
CloseTelemetry()
}

Expand Down Expand Up @@ -370,6 +372,12 @@ func (l logger) EnableTelemetry(cfg TelemetryConfig) (err error) {
return EnableTelemetry(cfg, &l)
}

func (l logger) UpdateTelemetryURI(uri string) {
if l.loggerState.telemetry.hook.UpdateHookURI(uri) {
telemetryConfig.URI = uri
}
}

func (l logger) GetTelemetryEnabled() bool {
return l.loggerState.telemetry != nil
}
Expand All @@ -386,6 +394,10 @@ func (l logger) GetInstanceName() string {
return telemetryConfig.getInstanceName()
}

func (l logger) GetTelemetryURI() string {
return telemetryConfig.URI
}

func (l logger) Metrics(category telemetryspec.Category, metrics telemetryspec.MetricDetails, details interface{}) {
if l.loggerState.telemetry != nil {
l.loggerState.telemetry.logMetrics(l, category, metrics, details)
Expand Down
19 changes: 18 additions & 1 deletion logging/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ func enableTelemetryState(telemetry *telemetryState, l *logger) {
l.setOutput(telemetry.wrapOutput(l.getOutput()))
}

func makeLevels(min logrus.Level) []logrus.Level {
levels := []logrus.Level{}
for _, l := range []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
} {
if l <= min {
levels = append(levels, l)
}
}
return levels
}

func makeTelemetryState(cfg TelemetryConfig, hookFactory hookFactory) (*telemetryState, error) {
history := createLogBuffer(cfg.LogHistoryDepth)
if cfg.SessionGUID == "" {
Expand All @@ -64,7 +81,7 @@ func makeTelemetryState(cfg TelemetryConfig, hookFactory hookFactory) (*telemetr

telemetry := &telemetryState{
history,
createAsyncHook(hook, 32, 100),
createAsyncHookLevels(hook, 32, 100, makeLevels(cfg.MinLogLevel)),
}
return telemetry, nil
}
Expand Down
3 changes: 3 additions & 0 deletions logging/telemetryCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type asyncTelemetryHook struct {
entries chan *logrus.Entry
quit chan struct{}
maxQueueDepth int
levels []logrus.Level
ready bool
urlUpdate chan bool
}

type hookFactory func(cfg TelemetryConfig) (logrus.Hook, error)
16 changes: 1 addition & 15 deletions logging/telemetryConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ var TelemetryConfigFilename = "logging.config"

const ipv6AddressLength = 39

func elasticsearchEndpoint() string {
return "https://1ae9f9654b25441090fe5c48c833b95a.us-east-1.aws.found.io:9243"
}

// TelemetryOverride Determines whether an override value is set and what it's value is.
// The first return value is whether an override variable is found, if it is, the second is the override value.
func TelemetryOverride(env string) bool {
Expand All @@ -65,7 +61,7 @@ func createTelemetryConfig() TelemetryConfig {
return TelemetryConfig{
Enable: enable,
GUID: uuid.NewV4().String(),
URI: elasticsearchEndpoint(),
URI: "",
MinLogLevel: logrus.WarnLevel,
ReportHistoryLevel: logrus.WarnLevel,
LogHistoryDepth: 100,
Expand All @@ -90,11 +86,6 @@ func (cfg TelemetryConfig) Save(configPath string) error {
sanitizedCfg := cfg
sanitizedCfg.FilePath = ""

// If using default URI, don't save that - so we pick up the current default in the future
if sanitizedCfg.URI == elasticsearchEndpoint() {
sanitizedCfg.URI = ""
}

enc := json.NewEncoder(f)
err = enc.Encode(sanitizedCfg)
return err
Expand Down Expand Up @@ -142,11 +133,6 @@ func loadTelemetryConfig(path string) (TelemetryConfig, error) {
cfg := createTelemetryConfig()
dec := json.NewDecoder(f)
err = dec.Decode(&cfg)
if err == nil {
if len(cfg.URI) == 0 {
cfg.URI = elasticsearchEndpoint()
}
}
cfg.FilePath = path

// Sanitize user-defined name.
Expand Down
Loading