Skip to content

Commit

Permalink
Add monitor http endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kyhavlov committed Nov 16, 2016
1 parent 8ef76ec commit db3ff8b
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 2 deletions.
35 changes: 35 additions & 0 deletions api/agent.go
@@ -1,6 +1,7 @@
package api

import (
"bufio"
"fmt"
)

Expand Down Expand Up @@ -410,3 +411,37 @@ func (a *Agent) DisableNodeMaintenance() error {
resp.Body.Close()
return nil
}

// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop the
// log stream
func (a *Agent) Monitor(loglevel string, stopCh chan struct{}) (chan string, error) {
r := a.c.newRequest("GET", "/v1/agent/monitor")
if loglevel != "" {
r.params.Add("loglevel", loglevel)
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}

logCh := make(chan string, 64)
go func() {
defer resp.Body.Close()

scanner := bufio.NewScanner(resp.Body)
for {
select {
case <-stopCh:
close(logCh)
return
default:
}
if scanner.Scan() {
logCh <- scanner.Text()
}
}
}()

return logCh, nil
}
24 changes: 24 additions & 0 deletions api/agent_test.go
Expand Up @@ -3,6 +3,7 @@ package api
import (
"strings"
"testing"
"time"
)

func TestAgent_Self(t *testing.T) {
Expand Down Expand Up @@ -558,6 +559,29 @@ func TestAgent_ForceLeave(t *testing.T) {
}
}

func TestAgent_Monitor(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

agent := c.Agent()

logCh, err := agent.Monitor("info", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

// Wait for the first log message and validate it
select {
case log := <-logCh:
if !strings.Contains(log, "[INFO] raft: Initial configuration") {
t.Fatalf("bad: %q", log)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to get a log message")
}
}

func TestServiceMaintenance(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down
4 changes: 4 additions & 0 deletions command/agent/agent.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/serf/coordinate"
Expand Down Expand Up @@ -66,6 +67,9 @@ type Agent struct {
// Output sink for logs
logOutput io.Writer

// Used for streaming logs to
logWriter *logger.LogWriter

// We have one of a client or a server, depending
// on our configuration
server *consul.Server
Expand Down
82 changes: 82 additions & 0 deletions command/agent/agent_endpoint.go
Expand Up @@ -2,12 +2,15 @@ package agent

import (
"fmt"
"log"
"net/http"
"strconv"
"strings"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -393,6 +396,61 @@ func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Re
return nil, nil
}

func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Only GET supported
if req.Method != "GET" {
resp.WriteHeader(405)
return nil, nil
}

// Get the provided loglevel
logLevel := req.URL.Query().Get("loglevel")
if logLevel == "" {
logLevel = "INFO"
}

// Upper case the log level
logLevel = strings.ToUpper(logLevel)

// Create a level filter
filter := logger.LevelFilter()
filter.MinLevel = logutils.LogLevel(logLevel)
if !logger.ValidateLevelFilter(filter.MinLevel, filter) {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Unknown log level: %s", filter.MinLevel)))
return nil, nil
}

flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}

// Set up a log handler
handler := &httpLogHandler{
filter: filter,
logCh: make(chan string, 512),
logger: s.logger,
}
s.agent.logWriter.RegisterHandler(handler)
defer s.agent.logWriter.DeregisterHandler(handler)

notify := resp.(http.CloseNotifier).CloseNotify()

// Stream logs until the connection is closed
for {
select {
case <-notify:
return nil, nil
case log := <-handler.logCh:
resp.Write([]byte(log + "\n"))
flusher.Flush()
}
}

return nil, nil
}

// syncChanges is a helper function which wraps a blocking call to sync
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
Expand All @@ -401,3 +459,27 @@ func (s *HTTPServer) syncChanges() {
s.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
}

type httpLogHandler struct {
filter *logutils.LevelFilter
logCh chan string
logger *log.Logger
}

func (h *httpLogHandler) HandleLog(log string) {
// Check the log level
if !h.filter.Check([]byte(log)) {
return
}

// Do a non-blocking send
select {
case h.logCh <- log:
default:
// We can't log synchronously, since we are already being invoked
// from the logWriter, and a log will need to invoke Write() which
// already holds the lock. We must therefor do the log async, so
// as to not deadlock
go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint")
}
}
69 changes: 69 additions & 0 deletions command/agent/agent_endpoint_test.go
@@ -1,8 +1,10 @@
package agent

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -12,6 +14,7 @@ import (
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
Expand Down Expand Up @@ -1019,3 +1022,69 @@ func TestHTTPAgentRegisterServiceCheck(t *testing.T) {
t.Fatalf("bad: %#v", result["memcached_check2"])
}
}

func TestHTTPAgent_Monitor(t *testing.T) {
logWriter := logger.NewLogWriter(512)
expectedLogs := bytes.Buffer{}
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)

dir, srv := makeHTTPServerWithConfigLog(t, nil, logger)
srv.agent.logWriter = logWriter
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()

// Begin streaming logs from the monitor endpoint
req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil)
resp := newClosableRecorder()
go func() {
if _, err := srv.AgentMonitor(resp, req); err != nil {
t.Fatalf("err: %s", err)
}
}()

// Write the incoming logs to a channel for reading
logCh := make(chan string, 0)
go func() {
for {
line, err := resp.Body.ReadString('\n')
if err != nil && err != io.EOF {
t.Fatalf("err: %v", err)
}
if line != "" {
logCh <- line
}
}
}()

// Verify that the first 5 logs we get match the expected stream
for i := 0; i < 5; i++ {
select {
case log := <-logCh:
expected, err := expectedLogs.ReadString('\n')
if err != nil {
t.Fatalf("err: %v", err)
}
if log != expected {
t.Fatalf("bad: %q %q", expected, log)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to get log within timeout")
}
}
}

type closableRecorder struct {
*httptest.ResponseRecorder
closer chan bool
}

func newClosableRecorder() *closableRecorder {
r := httptest.NewRecorder()
closer := make(chan bool)
return &closableRecorder{r, closer}
}

func (r *closableRecorder) CloseNotify() <-chan bool {
return r.closer
}
1 change: 1 addition & 0 deletions command/agent/command.go
Expand Up @@ -471,6 +471,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
agent.logWriter = logWriter
c.agent = agent

// Setup the RPC listener
Expand Down
1 change: 1 addition & 0 deletions command/agent/http.go
Expand Up @@ -251,6 +251,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
}
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
Expand Down
6 changes: 5 additions & 1 deletion command/agent/http_test.go
Expand Up @@ -28,6 +28,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) {
}

func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) {
return makeHTTPServerWithConfigLog(t, cb, nil)
}

func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) {
configTry := 0
RECONF:
configTry += 1
Expand All @@ -36,7 +40,7 @@ RECONF:
cb(conf)
}

dir, agent := makeAgent(t, conf)
dir, agent := makeAgentLog(t, conf, l)
servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil {
if configTry < 3 {
Expand Down
14 changes: 13 additions & 1 deletion website/source/docs/agent/http/agent.html.markdown
Expand Up @@ -21,6 +21,7 @@ The following endpoints are supported:
* [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent
* [`/v1/agent/self`](#agent_self) : Returns the local node configuration
* [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode
* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent
* [`/v1/agent/join/<address>`](#agent_join) : Triggers the local agent to join a node
* [`/v1/agent/force-leave/<node>`](#agent_force_leave): Forces removal of a node
* [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check
Expand Down Expand Up @@ -211,6 +212,17 @@ to aid human operators. If no reason is provided, a default value will be used i

The return code is 200 on success.

### <a name="agent_monitor"></a> /v1/agent/monitor

Added in Consul 0.7.2, This endpoint is hit with a GET and will stream logs from the
local agent until the connection is closed.

The `?loglevel` flag is optional. If provided, its value should be a text string
containing a log level to filter on, such as `info`. If no loglevel is provided,
`info` will be used as a default.

The return code is 200 on success.

### <a name="agent_join"></a> /v1/agent/join/\<address\>

This endpoint is hit with a GET and is used to instruct the agent to attempt to
Expand Down Expand Up @@ -403,7 +415,7 @@ body must look like:
],
"Address": "127.0.0.1",
"Port": 8000,
"EnableTagOverride": false,
"EnableTagOverride": false,
"Check": {
"DeregisterCriticalServiceAfter": "90m",
"Script": "/usr/local/bin/check_redis.py",
Expand Down

0 comments on commit db3ff8b

Please sign in to comment.