Skip to content

Commit

Permalink
Add --input-file-dry-run option
Browse files Browse the repository at this point in the history
Now you can get information about file content without performing the actual replay.
For example, it can tell you how many requests in your files, and how long it will take to replay them.
  • Loading branch information
buger committed Jul 8, 2021
1 parent 61b377d commit 8e76559
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 21 deletions.
66 changes: 58 additions & 8 deletions input_file.go
Expand Up @@ -6,8 +6,10 @@ import (
"compress/gzip"
"container/heap"
"errors"
"expvar"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -51,9 +53,6 @@ func (h *payloadQueue) Pop() interface{} {
}

func (h payloadQueue) Idx(i int) *filePayload {
h.RLock()
defer h.RUnlock()

return h.s[i]
}

Expand Down Expand Up @@ -196,17 +195,22 @@ type FileInput struct {
speedFactor float64
loop bool
readDepth int
dryRun bool

stats *expvar.Map
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int) (i *FileInput) {
func NewFileInput(path string, loop bool, readDepth int, dryRun bool) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop
i.readDepth = readDepth
i.stats = expvar.NewMap("file-" + path)
i.dryRun = dryRun

if err := i.init(); err != nil {
return
Expand Down Expand Up @@ -246,7 +250,6 @@ func (i *FileInput) init() (err error) {
} else if matches, err = filepath.Glob(i.path); err != nil {
Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err)
return

}

if len(matches) == 0 {
Expand All @@ -260,6 +263,8 @@ func (i *FileInput) init() (err error) {
i.readers[idx] = newFileInputReader(p, i.readDepth)
}

i.stats.Add("reader_count", int64(len(matches)))

return nil
}

Expand All @@ -270,6 +275,7 @@ func (i *FileInput) PluginRead() (*Message, error) {
case <-i.exit:
return nil, ErrorStopped
case buf := <-i.data:
i.stats.Add("read_from", 1)
msg.Meta, msg.Data = payloadMetaWithBody(buf)
return &msg, nil
}
Expand All @@ -292,7 +298,7 @@ func (i *FileInput) nextReader() (next *fileInputReader) {
continue
}

if next == nil || r.queue.Idx(0).timestamp > next.queue.Idx(0).timestamp {
if next == nil || r.queue.Idx(0).timestamp < next.queue.Idx(0).timestamp {
next = r
continue
}
Expand All @@ -304,6 +310,11 @@ func (i *FileInput) nextReader() (next *fileInputReader) {
func (i *FileInput) emit() {
var lastTime int64 = -1

var maxWait, firstWait, minWait int64
minWait = math.MaxInt64

i.stats.Add("negative_wait", 0)

for {
select {
case <-i.exit:
Expand All @@ -325,18 +336,39 @@ func (i *FileInput) emit() {

reader.queue.RLock()
payload := heap.Pop(&reader.queue).(*filePayload)
i.stats.Add("total_counter", 1)
i.stats.Add("total_bytes", int64(len(payload.data)))
reader.queue.RUnlock()

if lastTime != -1 {
diff := payload.timestamp - lastTime

if firstWait == 0 {
firstWait = diff
}

if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}

if diff >= 0 {
lastTime = payload.timestamp
time.Sleep(time.Duration(diff))

if !i.dryRun {
time.Sleep(time.Duration(diff))
}

i.stats.Add("total_wait", diff)

if diff > maxWait {
maxWait = diff
}

if diff < minWait {
minWait = diff
}
} else {
i.stats.Add("negative_wait", 1)
}
} else {
lastTime = payload.timestamp
Expand All @@ -347,12 +379,30 @@ func (i *FileInput) emit() {
case <-i.exit:
return
default:
i.data <- payload.data
if !i.dryRun {
i.data <- payload.data
}
}
}

i.stats.Set("first_wait", time.Duration(firstWait))
i.stats.Set("max_wait", time.Duration(maxWait))
i.stats.Set("min_wait", time.Duration(minWait))

Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))

if i.dryRun {
fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",
i.stats.Get("total_counter"),
i.stats.Get("reader_count"),
i.stats.Get("total_bytes"),
i.stats.Get("max_wait"),
i.stats.Get("min_wait"),
i.stats.Get("first_wait"),
time.Duration(i.stats.Get("total_wait").(*expvar.Int).Value()),
i.stats.Get("negative_wait"),
)
}
}

// Close closes this plugin
Expand Down
12 changes: 6 additions & 6 deletions input_file_test.go
Expand Up @@ -104,7 +104,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand All @@ -130,7 +130,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) {
file.Write([]byte("1 3 250000000\nrequest3"))
file.Write([]byte(payloadSeparator))

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100, false)

start := time.Now().UnixNano()
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestInputFileLoop(t *testing.T) {
file.Write([]byte(payloadSeparator))
file.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100, false)

// Even if we have just 2 requests in file, it should indifinitly loop
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestInputFileCompressed(t *testing.T) {
name2 := output2.file.Name()
output2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100, false)
for i := 0; i < 2000; i++ {
input.PluginRead()
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile {
func ReadFromCaptureFile(captureFile *os.File, count int, callback writeCallback) (err error) {
wg := new(sync.WaitGroup)

input := NewFileInput(captureFile.Name(), false, 100)
input := NewFileInput(captureFile.Name(), false, 100, false)
output := NewTestOutput(func(msg *Message) {
callback(msg)
wg.Done()
Expand Down
9 changes: 5 additions & 4 deletions input_raw_test.go
Expand Up @@ -233,10 +233,11 @@ func TestInputRAWChunkedEncoding(t *testing.T) {

originAddr := strings.Replace(origin.Listener.Addr().String(), "[::]", "127.0.0.1", -1)
conf := RAWInputConfig{
Engine: capture.EnginePcap,
Expire: time.Second,
Protocol: ProtocolHTTP,
TrackResponse: true,
Engine: capture.EnginePcap,
Expire: time.Second,
Protocol: ProtocolHTTP,
TrackResponse: true,
AllowIncomplete: true,
}
input := NewRAWInput(originAddr, conf)

Expand Down
2 changes: 1 addition & 1 deletion output_file_test.go
Expand Up @@ -39,7 +39,7 @@ func TestFileOutput(t *testing.T) {
emitter.Close()

var counter int64
input2 := NewFileInput("/tmp/test_requests.gor", false, 100)
input2 := NewFileInput("/tmp/test_requests.gor", false, 100, false)
output2 := NewTestOutput(func(*Message) {
atomic.AddInt64(&counter, 1)
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions output_http.go
Expand Up @@ -215,6 +215,7 @@ func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
if !isRequestPayload(msg.Meta) {
return
}

uuid := payloadID(msg.Meta)
start := time.Now()
resp, err := client.Send(msg.Data)
Expand Down
2 changes: 1 addition & 1 deletion plugins.go
Expand Up @@ -118,7 +118,7 @@ func NewPlugins() *InOutPlugins {
}

for _, options := range Settings.InputFile {
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth)
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileDryRun)
}

for _, path := range Settings.OutputFile {
Expand Down
2 changes: 1 addition & 1 deletion s3_test.go
Expand Up @@ -127,7 +127,7 @@ func TestInputFileFromS3(t *testing.T) {
<-output.closeCh
}

input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd, 100), false)
input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false, 100, false)

buf := make([]byte, 1000)
for i := 0; i <= 19999; i++ {
Expand Down
2 changes: 2 additions & 0 deletions settings.go
Expand Up @@ -48,6 +48,7 @@ type AppSettings struct {
InputFile MultiOption `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
InputFileDryRun bool `json:"input-file-dry-run"`
OutputFile MultiOption `json:"output-file"`
OutputFileConfig FileOutputConfig

Expand Down Expand Up @@ -115,6 +116,7 @@ func init() {
flag.Var(&Settings.InputFile, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com")
flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.")
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")
flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.")

flag.Var(&Settings.OutputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")
Expand Down

0 comments on commit 8e76559

Please sign in to comment.