Skip to content

Commit

Permalink
Add streaming command support.
Browse files Browse the repository at this point in the history
Add options
- `stream-stdout-in-response`
- `stream-stdout-in-response-on-error`
- `stream-command-kill-grace-period-seconds`

to allow defining webhooks which dynamically stream large content back to the
requestor. This allows the creation of download endpoints from scripts, i.e.
running a `git archive` command or a database dump from a docker container,
without needing to buffer up the original.
  • Loading branch information
wrouesnel committed May 23, 2018
1 parent 20fb3e3 commit 807e96a
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 37 deletions.
1 change: 0 additions & 1 deletion hook/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ type Hook struct {
CaptureCommandOutputOnError bool `json:"include-command-output-in-response-on-error,omitempty"`
StreamCommandStdout bool `json:"stream-stdout-in-response,omitempty"`
StreamCommandStderrOnError bool `json:"stream-stderr-in-response-on-error,omitempty"`
//KillCommandOnWriteError bool `json:"kill-command-on-write-error,omitempty"`
StreamCommandKillGraceSecs float64 `json:"stream-command-kill-grace-period-seconds,omitempty"`
PassEnvironmentToCommand []Argument `json:"pass-environment-to-command,omitempty"`
PassArgumentsToCommand []Argument `json:"pass-arguments-to-command,omitempty"`
Expand Down
22 changes: 16 additions & 6 deletions test/hookecho.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,47 @@ import (
"os"
"strings"
"strconv"
"io"
)

func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool {
if _, found := prefixMap[prefix]; found {
fmt.Printf("prefix specified more then once: %s", arg)
os.Exit(-1)
}
prefixMap[prefix] = struct{}{}
return strings.HasPrefix(arg, "stream=")

if strings.HasPrefix(arg, prefix) {
prefixMap[prefix] = struct{}{}
return true
}

return false
}

func main() {
outputStream := os.Stdout
var outputStream io.Writer
outputStream = os.Stdout
seenPrefixes := make(map[string]struct{})
exit_code := 0

for _, arg := range os.Args {
for _, arg := range os.Args[1:] {
if checkPrefix(seenPrefixes, "stream=", arg) {
switch arg {
case "stream=stdout":
outputStream = os.Stdout
case "stream=stderr":
outputStream = os.Stderr
case "stream=both":
outputStream = io.MultiWriter(os.Stdout, os.Stderr)
default:
fmt.Printf("unrecognized stream specification: %s", arg)
os.Exit(-1)
}
} else if checkPrefix(seenPrefixes, "exit=", arg) {
exit_code_str := os.Args[1][5:]
exit_code_str := arg[5:]
var err error
exit_code, err = strconv.Atoi(exit_code_str)
exit_code_conv, err := strconv.Atoi(exit_code_str)
exit_code = exit_code_conv
if err != nil {
fmt.Printf("Exit code %s not an int!", exit_code_str)
os.Exit(-1)
Expand Down
8 changes: 6 additions & 2 deletions test/hooks.json.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,22 @@
"include-command-output-in-response-on-error": true
},
{
"id": "steam-stdout-in-response",
"id": "stream-stdout-in-response",
"pass-arguments-to-command": [
{
"source": "string",
"name": "exit=0"
},
{
"source": "string",
"name": "stream=both"
}
],
"execute-command": "{{ .Hookecho }}",
"stream-stdout-in-response": true
},
{
"id": "steam-stderr-in-response-on-error",
"id": "stream-stderr-in-response-on-error",
"pass-arguments-to-command": [
{
"source": "string",
Expand Down
6 changes: 4 additions & 2 deletions test/hooks.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@
execute-command: '{{ .Hookecho }}'
include-command-output-in-response: true
include-command-output-in-response-on-error: true
- id: steam-stdout-in-response
- id: stream-stdout-in-response
execute-command: '{{ .Hookecho }}'
stream-stdout-in-response: true
pass-arguments-to-command:
- source: string
name: exit=0
- id: steam-stderr-in-response-on-error
- source: string
name: stream=both
- id: stream-stderr-in-response-on-error
execute-command: '{{ .Hookecho }}'
stream-stdout-in-response: true
stream-stderr-in-response-on-error: true
Expand Down
107 changes: 107 additions & 0 deletions test/hookstream/hookstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Hook Stream is a simple utility for testing Webhook streaming capability. It spawns a TCP server on execution
// which echos all connections to its stdout until it receives the string EOF.

package main

import (
"fmt"
"os"
"strings"
"strconv"
"io"
"net"
"bufio"
)

func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool {
if _, found := prefixMap[prefix]; found {
fmt.Printf("prefix specified more then once: %s", arg)
os.Exit(-1)
}

if strings.HasPrefix(arg, prefix) {
prefixMap[prefix] = struct{}{}
return true
}

return false
}

func main() {
var outputStream io.Writer
outputStream = os.Stdout
seenPrefixes := make(map[string]struct{})
exit_code := 0

for _, arg := range os.Args[1:] {
if checkPrefix(seenPrefixes, "stream=", arg) {
switch arg {
case "stream=stdout":
outputStream = os.Stdout
case "stream=stderr":
outputStream = os.Stderr
case "stream=both":
outputStream = io.MultiWriter(os.Stdout, os.Stderr)
default:
fmt.Printf("unrecognized stream specification: %s", arg)
os.Exit(-1)
}
} else if checkPrefix(seenPrefixes, "exit=", arg) {
exit_code_str := arg[5:]
var err error
exit_code_conv, err := strconv.Atoi(exit_code_str)
exit_code = exit_code_conv
if err != nil {
fmt.Printf("Exit code %s not an int!", exit_code_str)
os.Exit(-1)
}
}
}

l, err := net.Listen("tcp", "localhost:0")
if err != nil {
fmt.Printf("Error starting tcp server: %v\n", err)
os.Exit(-1)
}
defer l.Close()

// Emit the address of the server
fmt.Printf("%v\n",l.Addr())

manageCh := make(chan struct{})

go func() {
for {
conn, err := l.Accept()
if err != nil {
fmt.Printf("Error accepting connection: %v\n", err)
os.Exit(-1)
}
go handleRequest(manageCh, outputStream, conn)
}
}()

<- manageCh
l.Close()

os.Exit(exit_code)
}

// Handles incoming requests.
func handleRequest(manageCh chan<- struct{}, w io.Writer, conn net.Conn) {
defer conn.Close()
bio := bufio.NewScanner(conn)
for bio.Scan() {
if line := strings.TrimSuffix(bio.Text(), "\n"); line == "EOF" {
// Request program close
select {
case manageCh <- struct{}{}:
// Request sent.
default:
// Already closing
}
break
}
fmt.Fprintf(w, "%s\n", bio.Text())
}
}
52 changes: 26 additions & 26 deletions webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {

log.Printf("[%s] incoming HTTP request from %s\n", rid, r.RemoteAddr)

for _, responseHeader := range responseHeaders {
w.Header().Set(responseHeader.Name, responseHeader.Value)
}

id := mux.Vars(r)["id"]

matchedHook := matchLoadedHook(id)
Expand Down Expand Up @@ -345,6 +341,7 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {
stdoutRdr, stderrRdr, errCh := handleHook(ctx, matchedHook, rid, &headers, &query, &payload, &body)

if matchedHook.StreamCommandStdout {
log.Printf("[%s] Hook (%s) is a streaming command hook\n", rid, matchedHook.ID)
// Collect stderr to avoid blocking processes and emit it as a string
stderrRdy := make(chan string, 1)
go func() {
Expand All @@ -362,17 +359,18 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {

// Streaming output should commence as soon as the command execution tries to write any data
firstByte := make([]byte,1)
_, err := stdoutRdr.Read(firstByte)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, fbErr := stdoutRdr.Read(firstByte)
if fbErr != nil && fbErr != io.EOF {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Error occurred while trying to read from the process's first byte. Please check your logs for more details.")
}

// Did the process throw an error before we read this byte?
select {
case err := <- errCh:
if err != nil {
log.Printf("[%s] Hook error while reading first byte: %v\n", rid, err)
return
} else if fbErr == io.EOF {
log.Printf("[%s] EOF from hook stdout while reading first byte. Waiting for program exit status\n", rid)
if err := <- errCh; err != nil {
log.Printf("[%s] Hook (%s) returned an error before the first byte. Collecting stderr and failing.\n", rid, matchedHook.ID)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
if matchedHook.StreamCommandStderrOnError {
// Wait for the stderr buffer to finish collecting
Expand All @@ -382,24 +380,22 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {
return
}
} else {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintf(w, "Error occurred while executing the hooks command. Please check your logs for more details.")
}
return // Cannot proceed beyond here
}
default:
// no error - continue
// early EOF, but program exited successfully so stream as normal.
}

// Got the first byte (or possibly nothing) successfully. Write the success header, then commence
// streaming.
w.WriteHeader(http.StatusOK)

// Write user success headers
for _, responseHeader := range matchedHook.ResponseHeaders {
w.Header().Set(responseHeader.Name, responseHeader.Value)
}

// Got the first byte (or possibly nothing) successfully. Write the success header, then commence
// streaming.
w.WriteHeader(http.StatusOK)

if _, err := w.Write(firstByte); err != nil {
// Hard fail, client has disconnected or otherwise we can't continue.
msg := fmt.Sprintf("[%s] error while trying to stream first byte: %s", rid, err)
Expand All @@ -418,6 +414,7 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {
log.Printf(msg)

} else {
log.Printf("[%s] Hook (%s) is a conventional command hook\n", rid, matchedHook.ID)
// Don't break the original API and just combine the streams (specifically, kick off two readers which
// break on newlines and the emit that data in temporal order to the output buffer.
out := combinedOutput(stdoutRdr, stderrRdr)
Expand All @@ -426,13 +423,15 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {

err := <-errCh

log.Printf("[%s] got command execution result: %v", rid, err)

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
for _, responseHeader := range matchedHook.ResponseHeaders {
w.Header().Set(responseHeader.Name, responseHeader.Value)
}
w.WriteHeader(http.StatusOK)
}

if matchedHook.CaptureCommandOutput {
Expand Down Expand Up @@ -584,9 +583,10 @@ body *[]byte) (io.Reader, io.Reader, <-chan error) {

// Spawn a goroutine to wait for the command to end supply errors
go func() {
err := cmd.Wait()
if err != nil {
log.Printf("[%s] error occurred: %+v\n", rid, err)
resultErr := cmd.Wait()
close(doneCh) // Close the doneCh immediately so handlers exit correctly.
if resultErr != nil {
log.Printf("[%s] error occurred: %+v\n", rid, resultErr)
}

for i := range files {
Expand All @@ -601,9 +601,8 @@ body *[]byte) (io.Reader, io.Reader, <-chan error) {

log.Printf("[%s] finished handling %s\n", rid, h.ID)

errCh <- err
errCh <- resultErr
close(errCh)
close(doneCh)
}()

// Spawn a goroutine which checks if the context is ever cancelled, and sends SIGTERM / SIGKILL if it is
Expand All @@ -612,6 +611,7 @@ body *[]byte) (io.Reader, io.Reader, <-chan error) {

select {
case <- ctxDone:
log.Printf("[%s] Context done (request finished) - killing process.", rid)
// AFAIK this works on Win/Mac/Unix - where does it not work?
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
log.Printf("[%s] error sending SIGTERM to process for %s: %s\n", rid, h.ID, err)
Expand Down
4 changes: 4 additions & 0 deletions webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,5 +636,9 @@ env: HOOK_head_commit.timestamp=2013-03-12T08:14:29-07:00
`},
{"don't capture output on error by default", "capture-command-output-on-error-not-by-default", nil, `{}`, false, http.StatusInternalServerError, `Error occurred while executing the hook's command. Please check your logs for more details.`},
{"capture output on error with extra flag set", "capture-command-output-on-error-yes-with-extra-flag", nil, `{}`, false, http.StatusInternalServerError, `arg: exit=1
`},
{"streaming response yields stdout only", "stream-stdout-in-response", nil, `{}`, false, http.StatusOK, `arg: exit=0 stream=both
`},
{"streaming response with an error yields stderr", "stream-stderr-in-response-on-error", nil, `{}`, false, http.StatusInternalServerError, `arg: exit=1 stream=stderr
`},
}

0 comments on commit 807e96a

Please sign in to comment.