diff --git a/README.md b/README.md index a804579..b8c9534 100644 --- a/README.md +++ b/README.md @@ -84,3 +84,34 @@ func main() { } ``` + +## Splunk Writer ## +To support logging libraries, and other output, we've added an asynchronous Writer. It supports retries, and different intervals for flushing messages & max log messages in its buffer + +The easiest way to get access to the writer with an existing client is to do: + +```go +writer := splunkClient.Writer() +``` + +This will give you an io.Writer you can use to direct output to splunk. However, since the io.Writer() is asynchronous, it will never return an error from its Write() function. To access errors generated from the Client, +Instantiate your Writer this way: + +```go +splunk.Writer{ + Client: splunkClient +} +``` +Since the type will now be splunk.Writer(), you can access the `Errors()` function, which returns a channel of errors. You can then spin up a goroutine to listen on this channel and report errors, or you can handle however you like. + +Optionally, you can add more configuration to the writer. + +```go +splunk.Writer { + Client: splunkClient, + FlushInterval: 10 *time.Second, // How often we'll flush our buffer + FlushThreshold: 25, // Max messages we'll keep in our buffer, regardless of FlushInterval + MaxRetries: 2, // Number of times we'll retry a failed send +} +``` + diff --git a/splunk/splunk.go b/splunk/splunk.go index ac53bf7..6301365 100644 --- a/splunk/splunk.go +++ b/splunk/splunk.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "io" "net/http" "os" "time" @@ -136,6 +137,13 @@ func (c *Client) LogEvents(events []*Event) error { return c.doRequest(buf) } +//Writer is a convience method for creating an io.Writer from a Writer with default values +func (c *Client) Writer() io.Writer { + return &Writer{ + Client: c, + } +} + // Client.doRequest is used internally to POST the bytes of events to the Splunk server. func (c *Client) doRequest(b *bytes.Buffer) error { // make new request diff --git a/splunk/writer.go b/splunk/writer.go new file mode 100644 index 0000000..4138f5e --- /dev/null +++ b/splunk/writer.go @@ -0,0 +1,126 @@ +package splunk + +import ( + "sync" + "time" +) + +const ( + bufferSize = 100 + defaultInterval = 2 * time.Second + defaultThreshold = 10 + defaultRetries = 2 +) + +// Writer is a threadsafe, aysnchronous splunk writer. +// It implements io.Writer for usage in logging libraries, or whatever you want to send to splunk :) +// Writer.Client's configuration determines what source, sourcetype & index will be used for events +// Example for logrus: +// splunkWriter := &splunk.Writer {Client: client} +// logrus.SetOutput(io.MultiWriter(os.Stdout, splunkWriter)) +type Writer struct { + Client *Client + // How often the write buffer should be flushed to splunk + FlushInterval time.Duration + // How many Write()'s before buffer should be flushed to splunk + FlushThreshold int + // Max number of retries we should do when we flush the buffer + MaxRetries int + dataChan chan *message + errors chan error + once sync.Once +} + +// Associates some bytes with the time they were written +// Helpful if we have long flush intervals to more precisely record the time at which +// a message was written +type message struct { + data []byte + writtenAt time.Time +} + +// Writer asynchronously writes to splunk in batches +func (w *Writer) Write(b []byte) (int, error) { + // only initialize once. Keep all of our buffering in one thread + w.once.Do(func() { + // synchronously set up dataChan + w.dataChan = make(chan *message, bufferSize) + // Spin up single goroutine to listen to our writes + w.errors = make(chan error, bufferSize) + go w.listen() + }) + // Send the data to the channel + w.dataChan <- &message{ + data: b, + writtenAt: time.Now(), + } + // We don't know if we've hit any errors yet, so just say we're good + return len(b), nil +} + +// Errors returns a buffered channel of errors. Might be filled over time, might not +// Useful if you want to record any errors hit when sending data to splunk +func (w *Writer) Errors() <-chan error { + return w.errors +} + +// listen for messages +func (w *Writer) listen() { + if w.FlushInterval <= 0 { + w.FlushInterval = defaultInterval + } + if w.FlushThreshold == 0 { + w.FlushThreshold = defaultThreshold + } + ticker := time.NewTicker(w.FlushInterval) + buffer := make([]*message, 0) + //Define function so we can flush in several places + flush := func() { + // Go send the data to splunk + go w.send(buffer, w.MaxRetries) + // Make a new array since the old one is getting used by the splunk client now + buffer = make([]*message, 0) + } + for { + select { + case <-ticker.C: + if len(buffer) > 0 { + flush() + } + case d := <-w.dataChan: + buffer = append(buffer, d) + if len(buffer) > w.FlushThreshold { + flush() + } + } + } +} + +// send sends data to splunk, retrying upon failure +func (w *Writer) send(messages []*message, retries int) { + // Create events from our data so we can send them to splunk + events := make([]*Event, len(messages)) + for i, m := range messages { + // Use the configuration of the Client for the event + events[i] = w.Client.NewEventWithTime(m.writtenAt.Unix(), m.data, w.Client.Source, w.Client.SourceType, w.Client.Index) + } + // Send the events to splunk + err := w.Client.LogEvents(events) + // If we had any failures, retry as many times as they requested + if err != nil { + for i := 0; i < retries; i++ { + // retry + err = w.Client.LogEvents(events) + if err == nil { + return + } + } + // if we've exhausted our max retries, let someone know via Errors() + // might not have retried if retries == 0 + select { + case w.errors <- err: + // Don't block in case no one is listening or our errors channel is full + default: + } + } +} diff --git a/splunk/writer_test.go b/splunk/writer_test.go new file mode 100644 index 0000000..e59fe98 --- /dev/null +++ b/splunk/writer_test.go @@ -0,0 +1,91 @@ +package splunk + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" +) + +func TestWriter_Write(t *testing.T) { + numWrites := 1000 + numMessages := 0 + lock := sync.Mutex{} + notify := make(chan bool, numWrites) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, _ := ioutil.ReadAll(r.Body) + split := strings.Split(string(b), "\n") + num := 0 + // Since we batch our logs up before we send them: + // Increment our messages counter by one for each JSON object we got in this response + // We don't know how many responses we'll get, we only care about the number of messages + for _, line := range split { + if strings.HasPrefix(line, "{") { + num++ + notify <- true + } + } + lock.Lock() + numMessages = numMessages + num + lock.Unlock() + })) + + // Create a writer that's flushing constantly. We want this test to run + // quickly + writer := Writer{ + Client: NewClient(server.Client(), server.URL, "", "", "", ""), + FlushInterval: 1 * time.Millisecond, + } + // Send a bunch of messages in separate goroutines to make sure we're properly + // testing Writer's concurrency promise + for i := 0; i < numWrites; i++ { + go writer.Write([]byte(fmt.Sprintf("%d", i))) + } + // To notify our test we've collected everything we need. + doneChan := make(chan bool) + go func() { + for i := 0; i < numWrites; i++ { + // Do nothing, just loop through to the next one + <-notify + } + doneChan <- true + }() + select { + case <-doneChan: + // Do nothing, we're good + case <-time.After(1 * time.Second): + t.Errorf("Timed out waiting for messages") + } + // We may have received more than numWrites amount of messages, check that case + if numMessages != numWrites { + t.Errorf("Didn't get the right number of messages, expected %d, got %d", numWrites, numMessages) + } +} + +func TestWriter_Errors(t *testing.T) { + numMessages := 1000 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintln(w, "bad request") + })) + writer := Writer{ + Client: NewClient(server.Client(), server.URL, "", "", "", ""), + // Will flush after the last message is sent + FlushThreshold: numMessages - 1, + // Don't let the flush interval cause raciness + FlushInterval: 5 * time.Minute, + } + for i := 0; i < numMessages; i++ { + _, _ = writer.Write([]byte("some data")) + } + select { + case <-writer.Errors(): + // good to go, got our error + case <-time.After(1 * time.Second): + t.Errorf("Timed out waiting for error, should have gotten 1 error") + } +}