Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Middleware for custom request rewrite logic #162

Merged
merged 69 commits into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
208faec
Initial support for input modifiers
buger Dec 16, 2014
dcb8a1d
Merge branch 'master' into input-modifier
buger Jun 26, 2015
4f03f81
Working modifier implementation with examples
buger Jun 27, 2015
674ffc6
Start adding tests for traffic modifier
buger Jun 27, 2015
98f941b
Add support for complex commands
buger Jun 27, 2015
3e86456
Add more debug
buger Jun 28, 2015
8db14e3
Merge branch 'master' into input-modifier
buger Jul 9, 2015
e64aac9
Rename input modifier to middleware
buger Jul 9, 2015
f812120
Support only 1 middleware and rewrite tests
buger Jul 10, 2015
5e6f7e0
First middleware tests! :dance:
buger Jul 17, 2015
5cc4228
Merge branch 'master' into input-modifier
buger Jul 21, 2015
d62eccb
Make raw message expiration configurable to improve tests time
buger Jul 21, 2015
a9be181
Merge branch 'master' into input-modifier
buger Aug 3, 2015
3585419
Fix tests
buger Aug 3, 2015
9ab8b3c
Add header to request and response
buger Aug 10, 2015
5667cd3
Merge branch 'master' into input-modifier
buger Aug 10, 2015
5098e38
Fix tests, now properly receive responses
buger Aug 11, 2015
0009555
Apply fmt
buger Aug 11, 2015
8046468
Fix tests
buger Aug 11, 2015
2990fab
Track original responses
buger Aug 11, 2015
491541c
Response object should be separate message
buger Aug 13, 2015
bb319b5
Add request ID and token modifier middleware
buger Aug 14, 2015
70e7d8d
Reduce delay of raw input
buger Aug 14, 2015
56f0a3f
Remove too verbose output
buger Aug 15, 2015
4f2341b
Remove more debug
buger Aug 15, 2015
41c1421
Simulate some real-life server by adding small response delay
buger Aug 15, 2015
1a36374
Add some comments
buger Aug 15, 2015
a18079f
Improving docs
buger Aug 15, 2015
ddd5190
Improve docs
buger Aug 15, 2015
879ed48
Doc changes
buger Aug 15, 2015
0d44572
Some fixes
buger Aug 15, 2015
d109110
Ignore all release files
buger Aug 15, 2015
211be0c
Add request time and round-trip response time
buger Aug 17, 2015
69424bf
Fix hound
buger Aug 17, 2015
d96e339
Handle HTTP error codes
buger Aug 17, 2015
ea94dd0
fmt changes
buger Aug 18, 2015
3e763c4
Unify all input/output plugins
buger Aug 18, 2015
a971d8a
Change file format to be text based
buger Aug 18, 2015
7790b33
Refactor protocol scanner
buger Aug 18, 2015
8da4e54
Tcp communication should use same protocol as file based
buger Aug 18, 2015
ae35f88
Remove magic numbers
buger Aug 18, 2015
d6c776f
Verbose output for travis
buger Aug 18, 2015
b53fba0
Fix formatting issues
buger Aug 18, 2015
7cff781
Remove verbose flag
buger Aug 18, 2015
c4037e0
Increase travis timeout time
buger Aug 18, 2015
78456d6
More timeout
buger Aug 18, 2015
a6e29c6
Close connection
buger Aug 18, 2015
5d2f6ec
Fixes
buger Aug 19, 2015
e54f77f
Try to return all responses
buger Aug 19, 2015
6cb4c26
Some debugging and race fixes
buger Aug 19, 2015
9550b31
Fix plugin registration
buger Aug 19, 2015
33cdcd4
Use httptest package and properly close servers
buger Aug 20, 2015
6974a6e
Ensure that raw input gets closed
buger Aug 20, 2015
3c45595
More race fixes
buger Aug 20, 2015
81d3508
Enable race testing for travis
buger Aug 20, 2015
76d5e22
Enable packet debugging
buger Aug 20, 2015
9309fb8
Reduce debugging
buger Aug 20, 2015
6533b50
Travis should use 1.5
buger Aug 20, 2015
16d7f23
New fixes
buger Aug 22, 2015
cbc27bd
More debugging
buger Aug 22, 2015
02bf7a8
Rollback to 1.4.2 in travis
buger Aug 22, 2015
d5a1eb0
Increase input raw timeout
buger Aug 22, 2015
f3ea30c
Add example of java echo middleware
buger Aug 23, 2015
f0b96af
Add protection for malformed requests
buger Aug 23, 2015
929e027
small formatting change
buger Aug 23, 2015
3c81723
Increase timeout for large payload
buger Aug 24, 2015
fe91a20
fix race
buger Aug 24, 2015
43262f3
Increase timeouts
buger Aug 24, 2015
abde6ec
less verbose output
buger Aug 24, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ FROM google/golang

RUN cd /goroot/src/ && GOOS=linux GOARCH=386 ./make.bash --no-clean

RUN apt-get update && apt-get install ruby vim-common -y

WORKDIR /gopath/src/github.com/buger/gor/

ADD . /gopath/src/github.com/buger/gor/

RUN go get
RUN go get
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SOURCE = emitter.go gor.go gor_stat.go input_dummy.go input_file.go input_raw.go input_tcp.go limiter.go output_dummy.go output_file.go input_http.go output_http.go output_tcp.go plugins.go settings.go test_input.go elasticsearch.go http_modifier.go http_modifier_settings.go http_client.go
SOURCE = emitter.go gor.go gor_stat.go input_dummy.go input_file.go input_raw.go input_tcp.go limiter.go output_dummy.go output_file.go input_http.go output_http.go output_tcp.go plugins.go settings.go test_input.go elasticsearch.go http_modifier.go http_modifier_settings.go http_client.go traffic_modifier.go

SOURCE_PATH = /gopath/src/github.com/buger/gor/

Expand Down Expand Up @@ -31,7 +31,7 @@ dbench:

# Used mainly for debugging, because docker container do not have access to parent machine ports
drun:
docker run -v `pwd`:$(SOURCE_PATH) -t -i gor go run $(SOURCE) --input-dummy=0 --output-http="http://localhost:9000" --verbose -h
docker run -v `pwd`:/gopath/src/gor -t -i gor go run $(SOURCE) --input-modifier="bash ./examples/echo_modifier.sh" --input-dummy=0 --output-http="http://localhost:9000" --verbose

dbash:
docker run -v `pwd`:$(SOURCE_PATH) -t -i gor /bin/bash
dbash:
docker run -v `pwd`:$(SOURCE_PATH) -t -i gor /bin/bash
1 change: 1 addition & 0 deletions emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func CopyMulty(src io.Reader, writers ...io.Writer) (err error) {

for {
nr, er := src.Read(buf)

if nr > 0 && len(buf) > nr {
payload := buf[0:nr]

Expand Down
16 changes: 16 additions & 0 deletions examples/echo_modifier.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env ruby
# encoding: utf-8
while data = STDIN.gets
next unless data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use 2 (not 4) spaces for indentation.

data = data.chomp

decoded = [data].pack("H*")
encoded = decoded.unpack("H*").first

STDOUT.puts encoded


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line detected.

STDERR.puts "[DEBUG] Original data: #{data}"
STDERR.puts "[DEBUG] Decoded request: #{decoded}"
STDERR.puts "[DEBUG] Encoded data: #{encoded}"
end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final newline missing.

10 changes: 10 additions & 0 deletions examples/echo_modifier.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash
while read line; do
decoded=$(echo "$line" | xxd -r -p)
encoded=$(echo "$decoded" | xxd -p | tr -d "\\n")
echo "$encoded"

>&2 echo "[DEBUG] Original data: $line"
>&2 echo "[DEBUG] Decoded request: $decoded"
>&2 echo "[DEBUG] Encoded data: $encoded"
done;
2 changes: 1 addition & 1 deletion input_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (i *DummyInput) emit() {
for {
select {
case <-ticker.C:
i.data <- []byte("GET / HTTP/1.1\r\n\r\n")
i.data <- []byte("POST /pub/WWW/å HTTP/1.1\nHost: www.w3.org\r\n\r\na=1&b=2")
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func (i *RAWInput) Read(data []byte) (int, error) {
func (i *RAWInput) listen(address string) {
address = strings.Replace(address, "[::]", "127.0.0.1", -1)

Debug("Listening for traffic on: " + address)

host, port, err := net.SplitHostPort(address)

if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestRAWInput(t *testing.T) {
wg := new(sync.WaitGroup)
quit := make(chan int)

listener := startHTTP(func(req *http.Request) {})
listener := startHTTP(func(w http.ResponseWriter, req *http.Request) {})

input := NewRAWInput(listener.Addr().String())
output := NewTestOutput(func(data []byte) {
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestInputRAW100Expect(t *testing.T) {
file_content, _ := ioutil.ReadFile("README.md")

// Origing and Replay server initialization
origin := startHTTP(func(req *http.Request) {
origin := startHTTP(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
ioutil.ReadAll(req.Body)

Expand All @@ -69,7 +69,7 @@ func TestInputRAW100Expect(t *testing.T) {
wg.Done()
})

listener := startHTTP(func(req *http.Request) {
listener := startHTTP(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
body, _ := ioutil.ReadAll(req.Body)

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestInputRAWChunkedEncoding(t *testing.T) {
file_content, _ := ioutil.ReadFile("README.md")

// Origing and Replay server initialization
origin := startHTTP(func(req *http.Request) {
origin := startHTTP(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
ioutil.ReadAll(req.Body)

Expand All @@ -118,7 +118,7 @@ func TestInputRAWChunkedEncoding(t *testing.T) {

input := NewRAWInput(origin_address)

listener := startHTTP(func(req *http.Request) {
listener := startHTTP(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
body, _ := ioutil.ReadAll(req.Body)

Expand Down
2 changes: 1 addition & 1 deletion output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type HTTPOutput struct {
address string
limit int
queue chan []byte
responses chan []byte

needWorker chan int

Expand All @@ -40,7 +41,6 @@ type HTTPOutput struct {
}

func NewHTTPOutput(address string, config *HTTPOutputConfig) io.Writer {

o := new(HTTPOutput)

o.address = address
Expand Down
8 changes: 4 additions & 4 deletions output_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"time"
)

func startHTTP(cb func(*http.Request)) net.Listener {
func startHTTP(cb func(http.ResponseWriter, *http.Request)) net.Listener {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
cb(r)
cb(w, r)
})

listener, _ := net.Listen("tcp", ":0")
Expand All @@ -30,7 +30,7 @@ func TestHTTPOutput(t *testing.T) {

input := NewTestInput()

listener := startHTTP(func(req *http.Request) {
listener := startHTTP(func(w http.ResponseWriter, req *http.Request) {
if req.Header.Get("User-Agent") != "Gor" {
t.Error("Wrong header")
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func BenchmarkHTTPOutput(b *testing.B) {
wg := new(sync.WaitGroup)
quit := make(chan int)

listener := startHTTP(func(req *http.Request) {
listener := startHTTP(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(50 * time.Millisecond)
wg.Done()
})
Expand Down
5 changes: 5 additions & 0 deletions plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
type InOutPlugins struct {
Inputs []io.Reader
Outputs []io.Writer

Modifiers []TrafficModifier
}

type ReaderOrWriter interface{}
Expand Down Expand Up @@ -54,6 +56,9 @@ func registerPlugin(constructor interface{}, options ...interface{}) {
}

if _, ok := plugin.(io.Reader); ok {
for _, options := range Settings.inputModifier {
plugin_wrapper = NewTrafficModifier(plugin_wrapper, options)
}
Plugins.Inputs = append(Plugins.Inputs, plugin_wrapper.(io.Reader))
}

Expand Down
4 changes: 4 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type AppSettings struct {

inputRAW MultiOption

inputModifier MultiOption

inputHTTP MultiOption
outputHTTP MultiOption

Expand Down Expand Up @@ -76,6 +78,8 @@ func init() {

flag.Var(&Settings.inputRAW, "input-raw", "Capture traffic from given port (use RAW sockets and require *sudo* access):\n\t# Capture traffic from 8080 port\n\tgor --input-raw :8080 --output-http staging.com")

flag.Var(&Settings.inputModifier, "input-modifier", "Used for modifying input traffic using external command")

flag.Var(&Settings.inputHTTP, "input-http", "Read requests from HTTP, should be explicitly sent from your application:\n\t# Listen for http on 9000\n\tgor --input-http :9000 --output-http staging.com")

flag.Var(&Settings.outputHTTP, "output-http", "Forwards incoming requests to given http address.\n\t# Redirect all incoming requests to staging.com address \n\tgor --input-raw :80 --output-http http://staging.com")
Expand Down
98 changes: 98 additions & 0 deletions traffic_modifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"bufio"
"encoding/hex"
"fmt"
"io"
"log"
"os"
"os/exec"
"strings"
)

type TrafficModifier struct {
plugin interface{}
command string

data chan []byte

Stdin io.Writer
Stdout io.Reader
}

func NewTrafficModifier(plugin interface{}, command string) io.Reader {
m := new(TrafficModifier)
m.plugin = plugin
m.command = command
m.data = make(chan []byte)

commands := strings.Split(command, " ")
cmd := exec.Command(commands[0], commands[1:]...)

m.Stdout, _ = cmd.StdoutPipe()
m.Stdin, _ = cmd.StdinPipe()
cmd.Stderr = os.Stderr

go m.copy(m.Stdin, m.plugin.(io.Reader))
go m.read(m.Stdout)

go func() {
err := cmd.Start()

if err != nil {
log.Fatal(err)
}
}()

defer cmd.Wait()

return m
}

func (m *TrafficModifier) copy(to io.Writer, from io.Reader) {
buf := make([]byte, 5*1024*1024)
dst := make([]byte, len(buf)*2)

for {
nr, _ := from.Read(buf)
if nr > 0 && len(buf) > nr {
hex.Encode(dst, buf[0:nr])
to.Write(dst[0 : nr*2])
to.Write([]byte("\r\n"))
}
}
}

func (m *TrafficModifier) read(from io.Reader) {
buf := make([]byte, 5*1024*1024)

scanner := bufio.NewScanner(from)

for scanner.Scan() {
bytes := scanner.Bytes()
hex.Decode(buf, bytes)

Debug("Received:", buf[0:len(bytes)/2])

m.data <- buf[0 : len(bytes)/2]
}

if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "Traffic modifier command failed:", err)
}

return
}

func (m *TrafficModifier) Read(data []byte) (int, error) {
Debug("Trying to read channel!")
buf := <-m.data
copy(data, buf)

return len(buf), nil
}

func (m *TrafficModifier) String() string {
return fmt.Sprintf("Modifying traffic for %s using '%s' command", m.plugin, m.command)
}
Loading