Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/goreplay-middleware: Add goreplay middleware #4496

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
cloud.google.com/go/storage v1.10.0 // indirect
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/buger/goreplay v1.3.2
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/structs v1.0.0 // indirect
github.com/gavv/monotime v0.0.0-20161010190848-47d58efa6955 // indirect
Expand All @@ -80,7 +81,7 @@ require (
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.15.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
Expand All @@ -95,7 +96,6 @@ require (
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1 // indirect
github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521 // indirect
github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 // indirect
github.com/spf13/jwalterweatherman v0.0.0-20141219030609-3d60171a6431 // indirect
github.com/stretchr/objx v0.3.0 // indirect
Expand Down
50 changes: 45 additions & 5 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions support/log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package log

import (
"context"
"io"
"os"

loggly "github.com/segmentio/go-loggly"
Expand Down Expand Up @@ -80,6 +81,10 @@ func SetLevel(level logrus.Level) {
DefaultLogger.SetLevel(level)
}

func SetOut(out io.Writer) {
DefaultLogger.entry.Logger.Out = out
}

func WithField(key string, value interface{}) *Entry {
result := DefaultLogger.WithField(key, value)
return result
Expand Down
Empty file.
1 change: 1 addition & 0 deletions tools/goreplay-middleware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# goreplay-middleware
159 changes: 159 additions & 0 deletions tools/goreplay-middleware/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// The code below is a goreplay middleware used for regression testing current
// vs next Horizon version. The middleware system of goreplay is rather simple:
// it streams one of 3 message types to stdin: request (HTTP headers),
// original response and replayed response. On request we can modify the request
// and send it to stdout but we don't use this feature here: we send request
// to mirroring target as is. Finally, everything printed to stderr is the
// middleware log, this is where we put the information about the request if the
// diff is found.
//
// More information and diagrams about the middlewares can be found here:
// https://github.com/buger/goreplay/wiki/Middleware
package main

import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"io"
"os"
"time"

"github.com/buger/goreplay/proto"
"github.com/stellar/go/support/log"
)

// maxPerSecond defines how many requests should be checked at max per second
const maxPerSecond = 100

const (
requestType byte = '1'
originalResponseType byte = '2'
replayedResponseType byte = '3'
)

var lastCheck = time.Now()
var reqsCheckedPerSeq = 0
var pendingRequestsAdded, ignoredCount, diffsCount, okCount int64
var pendingRequests map[string]*Request

func main() {
processAll(os.Stdin, os.Stderr, os.Stdout)
}

func init() {
pendingRequests = make(map[string]*Request)
}

func processAll(stdin io.Reader, stderr, stdout io.Writer) {
log.SetOut(stderr)
log.SetLevel(log.InfoLevel)

scanner := bufio.NewScanner(stdin)
buf := make([]byte, 20*1024*1024) // 20MB
scanner.Buffer(buf, 20*1024*1024)

for scanner.Scan() {
encoded := scanner.Bytes()
buf := make([]byte, len(encoded)/2)
_, err := hex.Decode(buf, encoded)
if err != nil {
os.Stderr.WriteString(fmt.Sprintf("hex.Decode error: %v", err))
continue
}

if err := scanner.Err(); err != nil {
os.Stderr.WriteString(fmt.Sprintf("scanner.Err(): %v\n", err))
}

process(stderr, stdout, buf)

if len(pendingRequests) > 2000 {
// Around 3-4% of responses is lost (not sure why) so pendingRequests can grow
// indefinietly. Let's just truncate it when it becomes too big.
// There is one gotcha here. Goreplay will still send requests
// (`1` type payloads) even if traffic is rate limited. So if rate
// limit is applied even more requests can be lost. So we should
// use rate limiting implemented here when using middleware rather than
// Goreplay's rate limit.
pendingRequests = make(map[string]*Request)
}
}
}

func process(stderr, stdout io.Writer, buf []byte) {
// First byte indicate payload type:
payloadType := buf[0]
headerSize := bytes.IndexByte(buf, '\n') + 1
header := buf[:headerSize-1]

// Header contains space separated values of: request type, request id, and request start time (or round-trip time for responses)
meta := bytes.Split(header, []byte(" "))
// For each request you should receive 3 payloads (request, response, replayed response) with same request id
reqID := string(meta[1])
payload := buf[headerSize:]

switch payloadType {
case requestType:
if time.Since(lastCheck) > time.Second {
reqsCheckedPerSeq = 0
lastCheck = time.Now()

// Print stats every second
os.Stderr.WriteString(fmt.Sprintf(
"middleware stats: pendingRequests=%d requestsAdded=%d ok=%d diffs=%d ignored=%d\n",
len(pendingRequests),
pendingRequestsAdded,
okCount,
diffsCount,
ignoredCount,
))
}

if reqsCheckedPerSeq < maxPerSecond {
pendingRequests[reqID] = &Request{
Headers: payload,
}
pendingRequestsAdded++
reqsCheckedPerSeq++
}

// Emitting data back, without modification
_, err := io.WriteString(stdout, hex.EncodeToString(buf)+"\n")
if err != nil {
io.WriteString(stderr, fmt.Sprintf("stdout.WriteString error: %v", err))
}
case originalResponseType:
if req, ok := pendingRequests[reqID]; ok {
// Original response can arrive after mirrored so this should be improved
// instead of ignoring this case.
req.OriginalResponse = payload
}
case replayedResponseType:
if req, ok := pendingRequests[reqID]; ok {
req.MirroredResponse = payload

if req.IsIgnored() {
ignoredCount++
} else {
if !req.ResponseEquals() {
// TODO in the future publish the results to S3 for easier processing
log.WithFields(log.F{
"expected": req.OriginalBody(),
"actual": req.MirroredBody(),
"headers": string(req.Headers),
"path": string(proto.Path(req.Headers)),
}).Info("Mismatch found")
diffsCount++
} else {
okCount++
}
}

delete(pendingRequests, reqID)
}
default:
io.WriteString(stderr, "Unknown message type\n")
}
}
49 changes: 49 additions & 0 deletions tools/goreplay-middleware/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"bytes"
"encoding/hex"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcess(t *testing.T) {
// For 1 type, it returns the same msg to stdout
payload := "1 ID\nGET /ledgers HTTP/1.1\r\nHost: horizon.stellar.org\r\n\r\n"
stdin := strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout := bytes.Buffer{}
stderr := bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

decodedOut, err := hex.DecodeString(strings.TrimRight(stdout.String(), "\n"))
assert.NoError(t, err)
assert.Equal(t, payload, string(decodedOut))
assert.Equal(t, "", stderr.String())

// For 2 type, save the original response
payload = "2 ID\nHeader: true\r\n\r\nBody"
stdin = strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout = bytes.Buffer{}
stderr = bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

assert.Len(t, pendingRequests, 1)
assert.Equal(t, "", stdout.String())
assert.Equal(t, "", stderr.String())

// For 2 type, save the original response
payload = "3 ID\nHeader: true\r\n\r\nBody"
stdin = strings.NewReader(hex.EncodeToString([]byte(payload)))

stdout = bytes.Buffer{}
stderr = bytes.Buffer{}
processAll(stdin, &stderr, &stdout)

assert.Len(t, pendingRequests, 0)
assert.Equal(t, "", stdout.String())
assert.Equal(t, "", stderr.String())
}
99 changes: 99 additions & 0 deletions tools/goreplay-middleware/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"bytes"
"regexp"
"strings"

"github.com/buger/goreplay/proto"
)

var horizonURLs = regexp.MustCompile(`https:\/\/.*?(stellar\.org|127.0.0.1:8000)`)
var findResultMetaXDR = regexp.MustCompile(`"result_meta_xdr":[ ]?"([^"]*)",`)

// removeRegexps contains a list of regular expressions that, when matched,
// will be changed to an empty string. This is done to exclude known
// differences in responses between two Horizon version.
//
// Let's say that next Horizon version adds a new bool field:
// `is_authorized` on account balances list. You want to remove this
// field so it's not reported for each `/accounts/{id}` response.
var removeRegexps = []*regexp.Regexp{}

type replace struct {
regexp *regexp.Regexp
repl string
}

// replaceRegexps works like removeRegexps but replaces data
var replaceRegexps = []replace{}

type Request struct {
Headers []byte
OriginalResponse []byte
MirroredResponse []byte
}

func (r *Request) OriginalBody() string {
return string(proto.Body(r.OriginalResponse))
}

func (r *Request) MirroredBody() string {
return string(proto.Body(r.MirroredResponse))
}

func (r *Request) IsIgnored() bool {
if len(r.OriginalResponse) == 0 {
return true
}

originalLatestLedgerHeader := proto.Header(r.OriginalResponse, []byte("Latest-Ledger"))
mirroredLatestLedgerHeader := proto.Header(r.MirroredResponse, []byte("Latest-Ledger"))

if !bytes.Equal(originalLatestLedgerHeader, mirroredLatestLedgerHeader) {
return true
}

// Responses below are not supported but support can be added with some effort
originalTransferEncodingHeader := proto.Header(r.OriginalResponse, []byte("Transfer-Encoding"))
mirroredTransferEncodingHeader := proto.Header(r.MirroredResponse, []byte("Transfer-Encoding"))
if len(originalTransferEncodingHeader) > 0 ||
len(mirroredTransferEncodingHeader) > 0 {
return true
}

acceptEncodingHeader := proto.Header(r.Headers, []byte("Accept-Encoding"))
if strings.Contains(string(acceptEncodingHeader), "gzip") {
return true
}

acceptHeader := proto.Header(r.Headers, []byte("Accept"))
return strings.Contains(string(acceptHeader), "event-stream")
}

func (r *Request) ResponseEquals() bool {
originalBody := proto.Body(r.OriginalResponse)
mirroredBody := proto.Body(r.MirroredResponse)

return normalizeResponseBody(originalBody) == normalizeResponseBody(mirroredBody)
}

// normalizeResponseBody normalizes body to allow byte-byte comparison like removing
// URLs from _links or tx meta. May require updating on new releases.
func normalizeResponseBody(body []byte) string {
normalizedBody := string(body)
// `result_meta_xdr` can differ between core instances (confirmed this with core team)
normalizedBody = findResultMetaXDR.ReplaceAllString(normalizedBody, "")
// Remove Horizon URL from the _links
normalizedBody = horizonURLs.ReplaceAllString(normalizedBody, "")

for _, reg := range removeRegexps {
normalizedBody = reg.ReplaceAllString(normalizedBody, "")
}

for _, reg := range replaceRegexps {
normalizedBody = reg.regexp.ReplaceAllString(normalizedBody, reg.repl)
}

return normalizedBody
}