Skip to content

Commit

Permalink
JSON Checks for Heartbeat HTTP Monitors (elastic#8667)
Browse files Browse the repository at this point in the history
This commit adds a new `json` check for HTTP responses letting users define an arbitrary condition to match against parsed JSON to determine whether an endpoint is up or down.

The nice thing about structured checks like this is that it makes it easy for users to precisely piggy-back on top of existing JSON endpoints, or write their own where a given key/value could indicate the health of an external system. In a sense, it allows users to write a healthcheck endpoint.

An example can be seen below:

```yaml
heartbeat.monitors:
- type: http
  # List or urls to query
  urls: ["http://localhost:9200"]
  schedule: '@every 10s'
  check.response.json:
    - description: check version
      condition: equals.version.number: "6.4.0"
```


(cherry picked from commit 22ba375)
  • Loading branch information
andrewvc committed Nov 9, 2018
1 parent 7c6081d commit 09676ec
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
*Journalbeat*

- Add journalbeat. {pull}8703[8703]
- Add the ability to check against JSON HTTP bodies with conditions. {pull}8667[8667]

*Metricbeat*

Expand Down
8 changes: 8 additions & 0 deletions heartbeat/_meta/beat.reference.yml
Expand Up @@ -218,6 +218,14 @@ heartbeat.monitors:
# Required response contents.
#body:

# Parses the body as JSON, then checks against the given condition expression
#json:
#- description: Explanation of what the check does
# condition:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
Expand Down
7 changes: 6 additions & 1 deletion heartbeat/docs/heartbeat-options.asciidoc
Expand Up @@ -446,6 +446,7 @@ Under `check.response`, specify these options:
it's set to 0, any status code other than 404 is accepted.
*`headers`*:: The required response headers.
*`body`*:: A list of regular expressions to match the the body output. Only a single expression needs to match.
*`json`*:: A list of <<conditions,condition>> expressions executed against the body when parsed as JSON.

The following configuration shows how to check the response when the body
contains JSON:
Expand All @@ -461,7 +462,11 @@ contains JSON:
'X-API-Key': '12345-mykey-67890'
check.response:
status: 200
body: '{"status": "ok"}'
json:
- description: check status
condition:
equals:
status: ok
-------------------------------------------------------------------------------

The following configuration shows how to check the response for multiple regex
Expand Down
8 changes: 8 additions & 0 deletions heartbeat/heartbeat.reference.yml
Expand Up @@ -218,6 +218,14 @@ heartbeat.monitors:
# Required response contents.
#body:

# Parses the body as JSON, then checks against the given condition expression
#json:
#- description: Explanation of what the check does
# condition:
# equals:
# myField: expectedValue


# NOTE: THIS FEATURE IS DEPRECATED AND WILL BE REMOVED IN A FUTURE RELEASE
# Configure file json file to be watched for changes to the monitor:
#watch.poll_file:
Expand Down
64 changes: 62 additions & 2 deletions heartbeat/monitors/active/http/check.go
Expand Up @@ -18,12 +18,18 @@
package http

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"

pkgerrors "github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/conditions"
)

type RespCheck func(*http.Response) error
Expand All @@ -32,7 +38,7 @@ var (
errBodyMismatch = errors.New("body mismatch")
)

func makeValidateResponse(config *responseParameters) RespCheck {
func makeValidateResponse(config *responseParameters) (RespCheck, error) {
var checks []RespCheck

if config.Status > 0 {
Expand All @@ -49,7 +55,15 @@ func makeValidateResponse(config *responseParameters) RespCheck {
checks = append(checks, checkBody(config.RecvBody))
}

return checkAll(checks...)
if len(config.RecvJSON) > 0 {
jsonChecks, err := checkJSON(config.RecvJSON)
if err != nil {
return nil, err
}
checks = append(checks, jsonChecks)
}

return checkAll(checks...), nil
}

func checkOK(_ *http.Response) error { return nil }
Expand Down Expand Up @@ -115,3 +129,49 @@ func checkBody(body []match.Matcher) RespCheck {
return errBodyMismatch
}
}

func checkJSON(checks []*jsonResponseCheck) (RespCheck, error) {
type compiledCheck struct {
description string
condition conditions.Condition
}

var compiledChecks []compiledCheck

for _, check := range checks {
cond, err := conditions.NewCondition(check.Condition)
if err != nil {
return nil, err
}
compiledChecks = append(compiledChecks, compiledCheck{check.Description, cond})
}

return func(r *http.Response) error {
decoded := &common.MapStr{}
err := json.NewDecoder(r.Body).Decode(decoded)

if err != nil {
body, _ := ioutil.ReadAll(r.Body)
return pkgerrors.Wrapf(err, "could not parse JSON for body check with condition. Source: %s", body)
}

var errorDescs []string
for _, compiledCheck := range compiledChecks {
ok := compiledCheck.condition.Check(decoded)
if !ok {
errorDescs = append(errorDescs, compiledCheck.description)
}
}

if len(errorDescs) > 0 {
return fmt.Errorf(
"JSON body did not match %d conditions '%s' for monitor. Received JSON %+v",
len(errorDescs),
strings.Join(errorDescs, ","),
decoded,
)
}

return nil
}, nil
}
71 changes: 71 additions & 0 deletions heartbeat/monitors/active/http/check_test.go
Expand Up @@ -24,7 +24,12 @@ import (
"net/http/httptest"
"testing"

"github.com/elastic/beats/libbeat/common"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/conditions"
)

func TestCheckBody(t *testing.T) {
Expand Down Expand Up @@ -125,3 +130,69 @@ func TestCheckBody(t *testing.T) {
})
}
}

func TestCheckJson(t *testing.T) {
fooBazEqualsBar := common.MustNewConfigFrom(map[string]interface{}{"equals": map[string]interface{}{"foo": map[string]interface{}{"baz": "bar"}}})
fooBazEqualsBarConf := &conditions.Config{}
err := fooBazEqualsBar.Unpack(fooBazEqualsBarConf)
require.NoError(t, err)

fooBazEqualsBarDesc := "foo.baz equals bar"

var tests = []struct {
description string
body string
condDesc string
condConf *conditions.Config
result bool
}{
{
"positive match",
"{\"foo\": {\"baz\": \"bar\"}}",
fooBazEqualsBarDesc,
fooBazEqualsBarConf,
true,
},
{
"Negative match",
"{\"foo\": 123}",
fooBazEqualsBarDesc,
fooBazEqualsBarConf,
false,
},
{
"unparseable",
`notjson`,
fooBazEqualsBarDesc,
fooBazEqualsBarConf,
false,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, test.body)
}))
defer ts.Close()

res, err := http.Get(ts.URL)
if err != nil {
log.Fatal(err)
}

checker, err := checkJSON([]*jsonResponseCheck{{test.condDesc, test.condConf}})
require.NoError(t, err)
checkRes := checker(res)

if result := checkRes == nil; result != test.result {
if test.result {
t.Fatalf("Expected condition: '%s' to match body: %s. got: %s", test.condDesc, test.body, checkRes)
} else {
t.Fatalf("Did not expect condition: '%s' to match body: %s. got: %s", test.condDesc, test.body, checkRes)
}
}
})
}

}
15 changes: 12 additions & 3 deletions heartbeat/monitors/active/http/config.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/elastic/beats/libbeat/conditions"

"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"

Expand Down Expand Up @@ -68,9 +70,15 @@ type requestParameters struct {

type responseParameters struct {
// expected HTTP response configuration
Status uint16 `config:"status" verify:"min=0, max=699"`
RecvHeaders map[string]string `config:"headers"`
RecvBody []match.Matcher `config:"body"`
Status uint16 `config:"status" verify:"min=0, max=699"`
RecvHeaders map[string]string `config:"headers"`
RecvBody []match.Matcher `config:"body"`
RecvJSON []*jsonResponseCheck `config:"json"`
}

type jsonResponseCheck struct {
Description string `config:"description"`
Condition *conditions.Config `config:"condition"`
}

type compressionConfig struct {
Expand All @@ -93,6 +101,7 @@ var defaultConfig = Config{
Status: 0,
RecvHeaders: nil,
RecvBody: []match.Matcher{},
RecvJSON: nil,
},
},
}
Expand Down
5 changes: 4 additions & 1 deletion heartbeat/monitors/active/http/http.go
Expand Up @@ -70,7 +70,10 @@ func create(
body = buf.Bytes()
}

validator := makeValidateResponse(&config.Check.Response)
validator, err := makeValidateResponse(&config.Check.Response)
if err != nil {
return nil, 0, err
}

jobs = make([]monitors.Job, len(config.URLs))

Expand Down
1 change: 1 addition & 0 deletions heartbeat/monitors/active/http/simple_transp.go
Expand Up @@ -184,6 +184,7 @@ func (t *SimpleTransport) readResponse(
) (*http.Response, error) {
reader := bufio.NewReader(conn)
resp, err := http.ReadResponse(reader, req)
resp.Body = comboConnReadCloser{conn, resp.Body}
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions heartbeat/tests/system/config/heartbeat.yml.j2
Expand Up @@ -29,6 +29,14 @@ heartbeat.monitors:
{% endfor %}
{% endif -%}


{%- if monitor.check_response_json is defined %}
check.response.json:
{%- for check in monitor.check_response_json %}
- {{check}}
{% endfor %}
{% endif -%}

{%- if monitor.fields is defined %}
{% if monitor.fields_under_root %}fields_under_root: true{% endif %}
fields:
Expand Down
37 changes: 37 additions & 0 deletions heartbeat/tests/system/test_monitor.py
Expand Up @@ -41,6 +41,43 @@ def test_http(self, status_code):
raise SkipTest
self.assert_fields_are_documented(output[0])

@parameterized.expand([
("up", '{"foo": {"baz": "bar"}}'),
("down", '{"foo": "unexpected"}'),
("down", 'notjson'),
])
def test_http_json(self, expected_status, body):
"""
Test JSON response checks
"""
server = self.start_server(body, 200)
try:
self.render_config_template(
monitors=[{
"type": "http",
"urls": ["http://localhost:{}".format(server.server_port)],
"check_response_json": [{
"description": "foo equals bar",
"condition": {
"equals": {"foo": {"baz": "bar"}}
}
}]
}]
)

try:
proc = self.start_beat()
self.wait_until(lambda: self.log_contains("heartbeat is running"))

self.wait_until(
lambda: self.output_has(lines=1))
finally:
proc.check_kill_and_wait()

self.assert_last_status(expected_status)
finally:
server.shutdown()

@parameterized.expand([
(lambda server: "localhost:{}".format(server.server_port), "up"),
# This IP is reserved in IPv4
Expand Down

0 comments on commit 09676ec

Please sign in to comment.