Skip to content

Commit

Permalink
log: add filepath based headers in headers config to http servers
Browse files Browse the repository at this point in the history
This commit retains `Headers` as a `map[string]string`
and introduces `FileBasedHeaders` which is also a `map[string]string`
so that the user provided yaml configs can now additionally work with
filepaths.
`Headers` cannot share the same keys as `FileBasedHeaders`

example of using the new field:
file-based-headers:
  {X-CRDB-HEADER-A: path/to/file, X-CRDB-HEADER-B: other/path/to/file}

Release note (ops change): Added `file-based-headers` field found in
`http-defaults`section of the log config which accepts key-filepath pairs.
This allows values found at filepaths to be updated without restarting
the cluster by sending SIGHUP to notify that values need to be refreshed.

Epic: CRDB-25399
Fixes: CRDB-31521
  • Loading branch information
Santamaura committed Oct 2, 2023
1 parent af05d81 commit 250bb36
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Type-specific configuration options:
| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. |
| `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. |
| `headers` | a list of headers to attach to each HTTP request Inherited from `http-defaults.headers` if not specified. |
| `file-based-headers` | a list of headers with filepaths whose contents are attached to each HTTP request Inherited from `http-defaults.file-based-headers` if not specified. |
| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `http-defaults.compression` if not specified. |


Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_net//trace",
"@org_golang_x_sys//unix",
],
)

Expand Down
67 changes: 66 additions & 1 deletion pkg/util/log/http_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"fmt"
"net/http"
"net/url"
"os"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -62,6 +64,24 @@ func newHTTPSink(c logconfig.HTTPSinkConfig) (*httpSink, error) {

hs.config = &c

staticHeaders := make(map[string]string, len(c.Headers))
dhFilepaths := make(map[string]string, len(c.Headers))
for key, val := range c.Headers {
staticHeaders[key] = val
}
for key, val := range c.FileBasedHeaders {
dhFilepaths[key] = val
}
hs.staticHeaders = staticHeaders
if len(dhFilepaths) > 0 {
hs.dynamicHeaders = &dynamicHeaders{
headerToFilepath: dhFilepaths,
}
err := hs.RefreshDynamicHeaders()
if err != nil {
return nil, err
}
}
return hs, nil
}

Expand All @@ -71,6 +91,19 @@ type httpSink struct {
contentType string
doRequest func(sink *httpSink, logEntry []byte) (*http.Response, error)
config *logconfig.HTTPSinkConfig
// staticHeaders holds all the config headers defined by direct values.
staticHeaders map[string]string
// dynamicHeaders holds all the config headers defined by values from files.
// It will be nil if there are no filepaths provided.
dynamicHeaders *dynamicHeaders
}

type dynamicHeaders struct {
headerToFilepath map[string]string
mu struct {
syncutil.Mutex
headerToValue map[string]string
}
}

// output emits some formatted bytes to this sink.
Expand Down Expand Up @@ -123,9 +156,20 @@ func doPost(hs *httpSink, b []byte) (*http.Response, error) {
req.Header.Add(httputil.ContentEncodingHeader, httputil.GzipEncoding)
}

for k, v := range hs.config.Headers {
// Add both the staticHeaders and dynamicHeaders to the request.
for k, v := range hs.staticHeaders {
req.Header.Add(k, v)
}
// If the filepathMap was populated we know to check the values.
if hs.dynamicHeaders != nil {
func() {
hs.dynamicHeaders.mu.Lock()
defer hs.dynamicHeaders.mu.Unlock()
for k, v := range hs.dynamicHeaders.mu.headerToValue {
req.Header.Add(k, v)
}
}()
}
req.Header.Add(httputil.ContentTypeHeader, hs.contentType)
resp, err := hs.client.Do(req)
if err != nil {
Expand Down Expand Up @@ -172,3 +216,24 @@ func (e HTTPLogError) Error() string {
"received %v response attempting to log to [%v]",
e.StatusCode, e.Address)
}

// RefreshDynamicHeaders loads and sets the new dynamic headers for a given sink.
// It iterates over dynamicHeaders.filepath reading each file for contents and then
// updating dynamicHeaders.mu.value.
func (hs *httpSink) RefreshDynamicHeaders() error {
if hs.dynamicHeaders == nil {
return nil
}
dhVals := make(map[string]string, len(hs.dynamicHeaders.headerToFilepath))
for key, filepath := range hs.dynamicHeaders.headerToFilepath {
data, err := os.ReadFile(filepath)
if err != nil {
return err
}
dhVals[key] = string(data)
}
hs.dynamicHeaders.mu.Lock()
defer hs.dynamicHeaders.mu.Unlock()
hs.dynamicHeaders.mu.headerToValue = dhVals
return nil
}
60 changes: 50 additions & 10 deletions pkg/util/log/http_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)

var (
Expand All @@ -53,6 +56,7 @@ func testBase(
fn func(header http.Header, body string) error,
hangServer bool,
deadline time.Duration,
recall time.Duration,
) {
sc := ScopeWithoutShowLogs(t)
defer sc.Close(t)
Expand Down Expand Up @@ -164,6 +168,13 @@ func testBase(
return
}

// Issue a second log event if recall is specified so that the test can be
// run again.
if recall > 0 {
time.Sleep(recall)
Ops.Infof(context.Background(), "hello world")
}

// If the test was not requiring a timeout, it was requiring some
// logging message to match the predicate. If we don't see the
// predicate match, it is a test failure.
Expand Down Expand Up @@ -200,7 +211,7 @@ func TestMessageReceived(t *testing.T) {
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0), time.Duration(0))
}

// TestHTTPSinkTimeout verifies that a log call to a hanging server doesn't last
Expand All @@ -223,7 +234,7 @@ func TestHTTPSinkTimeout(t *testing.T) {
},
}

testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond)
testBase(t, defaults, nil /* testFn */, true /* hangServer */, 500*time.Millisecond, time.Duration(0))
}

// TestHTTPSinkContentTypeJSON verifies that the HTTP sink content type
Expand Down Expand Up @@ -258,7 +269,7 @@ func TestHTTPSinkContentTypeJSON(t *testing.T) {
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0), time.Duration(0))
}

// TestHTTPSinkContentTypePlainText verifies that the HTTP sink content type
Expand Down Expand Up @@ -293,7 +304,7 @@ func TestHTTPSinkContentTypePlainText(t *testing.T) {
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0), time.Duration(0))
}

func TestHTTPSinkHeadersAndCompression(t *testing.T) {
Expand All @@ -305,6 +316,13 @@ func TestHTTPSinkHeadersAndCompression(t *testing.T) {
format := "json"
expectedContentType := "application/json"
expectedContentEncoding := logconfig.GzipCompression
val := "secret-value"
filepathVal := "another-secret-value"
filepathReplaceVal := "third-secret-value"
// Test filepath method of providing header values.
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "filepath_test.txt")
require.NoError(t, os.WriteFile(filename, []byte(filepathVal), 0777))
defaults := logconfig.HTTPDefaults{
Address: &address,
Timeout: &timeout,
Expand All @@ -318,9 +336,12 @@ func TestHTTPSinkHeadersAndCompression(t *testing.T) {
},

Compression: &logconfig.GzipCompression,
Headers: map[string]string{"X-CRDB-TEST": "secret-value"},
// Provide both the old format and new format in order to test backwards compatability.
Headers: map[string]string{"X-CRDB-TEST": val},
FileBasedHeaders: map[string]string{"X-CRDB-TEST-2": filename},
}

var callCt int
testFn := func(header http.Header, body string) error {
t.Log(body)
contentType := header.Get("Content-Type")
Expand All @@ -340,17 +361,36 @@ func TestHTTPSinkHeadersAndCompression(t *testing.T) {
if !isGzipped([]byte(body)) {
return errors.New("expected gzipped body")
}
var matchCount int
filepathExpectedVal := filepathVal
if callCt > 0 {
filepathExpectedVal = filepathReplaceVal
}
for k, v := range header {
if k == "X-Crdb-Test" {
if k == "X-Crdb-Test" || k == "X-Crdb-Test-2" {
for _, vv := range v {
if vv == "secret-value" {
return nil
if vv == "secret-value" || vv == filepathExpectedVal {
matchCount++
}
}
}
}
return errors.New("expected to find special header in request")
if matchCount != 2 {
return errors.New("expected to find special header in request")
}
// If this is the first time the testFn has been called, update file contents and send SIGHUP.
if callCt == 0 {
callCt++
if err := os.WriteFile(filename, []byte(filepathReplaceVal), 0777); err != nil {
return err
}
t.Log("issuing SIGHUP")
if err := unix.Kill(unix.Getpid(), unix.SIGHUP); err != nil {
t.Fatal(err)
}
}
return nil
}

testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0))
testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0), 1*time.Second)
}
15 changes: 14 additions & 1 deletion pkg/util/log/log_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,17 @@ func flushDaemon() {
}
}

// signalFlusher flushes the log(s) every time SIGHUP is received.
// signalFlusher updates any header values from files in the http sinks
// and also flushes the log(s) every time SIGHUP is received.
// This handles both the primary and secondary loggers.
func signalFlusher() {
ch := sysutil.RefreshSignaledChan()
for sig := range ch {
Ops.Infof(context.Background(), "%s received, flushing logs", sig)
err := RefreshHttpSinkHeaders()
if err != nil {
Ops.Infof(context.Background(), "error while refreshing http sink headers: %s", err)
}
FlushFiles()
}
}
Expand All @@ -153,3 +158,11 @@ func StartAlwaysFlush() {
// There may be something in the buffers already; flush it.
FlushFiles()
}

// RefreshHttpSinkHeaders will iterate over all http sinks and replace the sink's
// dynamicHeaders with newly generated dynamicHeaders.
func RefreshHttpSinkHeaders() error {
return logging.allSinkInfos.iterHttpSinks(func(hs *httpSink) error {
return hs.RefreshDynamicHeaders()
})
}
4 changes: 4 additions & 0 deletions pkg/util/log/logconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ type HTTPDefaults struct {
// Headers is a list of headers to attach to each HTTP request
Headers map[string]string `yaml:",omitempty,flow"`

// FileBasedHeaders is a list of headers with filepaths whose contents are
// attached to each HTTP request
FileBasedHeaders map[string]string `yaml:"file-based-headers,omitempty,flow"`

// Compression can be "none" or "gzip" to enable gzip compression.
// Set to "gzip" by default.
Compression *string `yaml:",omitempty"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/log/logconfig/testdata/validate
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,14 @@ sinks:
address: a
channels: STORAGE
headers: {X-CRDB-HEADER: header-value-a}
file-based-headers: {X-CRDB-FILE-HEADER: /a/path/to/file}
buffering:
max-staleness: 10s
b:
address: b
channels: OPS
headers: {X-CRDB-HEADER: header-value-b, X-ANOTHER-HEADER: zz-yy-bb}
file-based-headers: {X-ANOTHER-FILE-HEADER: /other/path/to/file, X-CRDB-FILE-HEADER: /some/path/to/file}
buffering:
flush-trigger-size: 5.0KiB
c:
Expand All @@ -630,6 +632,7 @@ sinks:
timeout: 2s
disable-keep-alives: false
headers: {X-CRDB-HEADER: header-value-a}
file-based-headers: {X-CRDB-FILE-HEADER: /a/path/to/file}
compression: gzip
filter: INFO
format: json-compact
Expand All @@ -650,6 +653,7 @@ sinks:
timeout: 2s
disable-keep-alives: false
headers: {X-ANOTHER-HEADER: zz-yy-bb, X-CRDB-HEADER: header-value-b}
file-based-headers: {X-ANOTHER-FILE-HEADER: /other/path/to/file, X-CRDB-FILE-HEADER: /some/path/to/file}
compression: gzip
filter: INFO
format: json-compact
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/log/logconfig/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error {
if *hsc.Compression != GzipCompression && *hsc.Compression != NoneCompression {
return errors.New("compression must be 'gzip' or 'none'")
}
// If both header types are populated, make sure theres no duplicate keys
if hsc.Headers != nil && hsc.FileBasedHeaders != nil {
for key := range hsc.Headers {
if _, exists := hsc.FileBasedHeaders[key]; exists {
return errors.Newf("headers and file-based-headers have the same key %s", key)
}
}
}
return c.ValidateCommonSinkConfig(hsc.CommonSinkConfig)
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/util/log/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ func (r *sinkInfoRegistry) iterBufferedSinks(fn func(bs *bufferedSink) error) er
})
}

// iterHttpSink iterates over all the http sinks and stops at the first error
// encountered.
func (r *sinkInfoRegistry) iterHttpSinks(fn func(hs *httpSink) error) error {
return r.iter(func(si *sinkInfo) error {
if hs, ok := si.sink.(*httpSink); ok {
if err := fn(hs); err != nil {
return err
}
}
return nil
})
}

// put adds a sinkInfo into the registry.
func (r *sinkInfoRegistry) put(l *sinkInfo) {
r.mu.Lock()
Expand Down

0 comments on commit 250bb36

Please sign in to comment.