Skip to content

Commit

Permalink
Merge pull request #20002 from twistlock/19575_authz_plugin_support_e…
Browse files Browse the repository at this point in the history
…vents

Fix 19575: Docker events doesn't work with authorization plugin
  • Loading branch information
LK4D4 committed Feb 8, 2016
2 parents e6573a5 + 5ffc810 commit 862f073
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 9 deletions.
70 changes: 70 additions & 0 deletions integration-cli/docker_cli_authz_unix_test.go
Expand Up @@ -11,10 +11,15 @@ import (
"os"
"strings"

"bufio"
"bytes"
"github.com/docker/docker/pkg/authorization"
"github.com/docker/docker/pkg/integration/checker"
"github.com/docker/docker/pkg/plugins"
"github.com/go-check/check"
"os/exec"
"strconv"
"time"
)

const (
Expand Down Expand Up @@ -221,6 +226,71 @@ func (s *DockerAuthzSuite) TestAuthZPluginDenyResponse(c *check.C) {
c.Assert(res, check.Equals, fmt.Sprintf("Error response from daemon: authorization denied by plugin %s: %s\n", testAuthZPlugin, unauthorizedMessage))
}

// TestAuthZPluginAllowEventStream verifies event stream propogates correctly after request pass through by the authorization plugin
func (s *DockerAuthzSuite) TestAuthZPluginAllowEventStream(c *check.C) {
testRequires(c, DaemonIsLinux)

// Start the authorization plugin
err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
c.Assert(err, check.IsNil)
s.ctrl.reqRes.Allow = true
s.ctrl.resRes.Allow = true

startTime := strconv.FormatInt(daemonTime(c).Unix(), 10)
// Add another command to to enable event pipelining
eventsCmd := exec.Command(s.d.cmd.Path, "--host", s.d.sock(), "events", "--since", startTime)
stdout, err := eventsCmd.StdoutPipe()
if err != nil {
c.Assert(err, check.IsNil)
}

observer := eventObserver{
buffer: new(bytes.Buffer),
command: eventsCmd,
scanner: bufio.NewScanner(stdout),
startTime: startTime,
}

err = observer.Start()
c.Assert(err, checker.IsNil)
defer observer.Stop()

// Create a container and wait for the creation events
_, err = s.d.Cmd("pull", "busybox")
c.Assert(err, check.IsNil)
out, err := s.d.Cmd("run", "-d", "busybox", "top")
c.Assert(err, check.IsNil)

containerID := strings.TrimSpace(out)

events := map[string]chan bool{
"create": make(chan bool),
"start": make(chan bool),
}

matcher := matchEventLine(containerID, "container", events)
processor := processEventMatch(events)
go observer.Match(matcher, processor)

// Ensure all events are received
for event, eventChannel := range events {

select {
case <-time.After(5 * time.Second):
// Fail the test
observer.CheckEventError(c, containerID, event, matcher)
c.FailNow()
case <-eventChannel:
// Ignore, event received
}
}

// Ensure both events and container endpoints are passed to the authorization plugin
assertURIRecorded(c, s.ctrl.requestsURIs, "/events")
assertURIRecorded(c, s.ctrl.requestsURIs, "/containers/create")
assertURIRecorded(c, s.ctrl.requestsURIs, fmt.Sprintf("/containers/%s/start", containerID))
}

func (s *DockerAuthzSuite) TestAuthZPluginErrorResponse(c *check.C) {
err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
c.Assert(err, check.IsNil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/authorization/authz.go
Expand Up @@ -116,7 +116,7 @@ func (ctx *Ctx) AuthZResponse(rm ResponseModifier, r *http.Request) error {
}
}

rm.Flush()
rm.FlushAll()

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/authorization/authz_test.go
Expand Up @@ -118,7 +118,7 @@ func TestResponseModifier(t *testing.T) {
m.Write([]byte("body"))
m.WriteHeader(500)

m.Flush()
m.FlushAll()
if r.Header().Get("h1") != "v1" {
t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestResponseModifierOverride(t *testing.T) {
m.OverrideHeader(overrideHeaderBytes)
m.OverrideBody([]byte("override body"))
m.OverrideStatusCode(404)
m.Flush()
m.FlushAll()
if r.Header().Get("h1") != "v2" {
t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
}
Expand Down
79 changes: 73 additions & 6 deletions pkg/authorization/response.go
Expand Up @@ -5,13 +5,16 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/Sirupsen/logrus"
"net"
"net/http"
)

// ResponseModifier allows authorization plugins to read and modify the content of the http.response
type ResponseModifier interface {
http.ResponseWriter
http.Flusher
http.CloseNotifier

// RawBody returns the current http content
RawBody() []byte
Expand All @@ -32,7 +35,10 @@ type ResponseModifier interface {
OverrideStatusCode(statusCode int)

// Flush flushes all data to the HTTP response
Flush() error
FlushAll() error

// Hijacked indicates the response has been hijacked by the Docker daemon
Hijacked() bool
}

// NewResponseModifier creates a wrapper to an http.ResponseWriter to allow inspecting and modifying the content
Expand All @@ -44,23 +50,45 @@ func NewResponseModifier(rw http.ResponseWriter) ResponseModifier {
// the http request/response from docker daemon
type responseModifier struct {
// The original response writer
rw http.ResponseWriter
rw http.ResponseWriter

r *http.Request

status int
// body holds the response body
body []byte
// header holds the response header
header http.Header
// statusCode holds the response status code
statusCode int
// hijacked indicates the request has been hijacked
hijacked bool
}

func (rm *responseModifier) Hijacked() bool {
return rm.hijacked
}

// WriterHeader stores the http status code
func (rm *responseModifier) WriteHeader(s int) {

// Use original request if hijacked
if rm.hijacked {
rm.rw.WriteHeader(s)
return
}

rm.statusCode = s
}

// Header returns the internal http header
func (rm *responseModifier) Header() http.Header {

// Use original header if hijacked
if rm.hijacked {
return rm.rw.Header()
}

return rm.header
}

Expand Down Expand Up @@ -90,6 +118,11 @@ func (rm *responseModifier) OverrideHeader(b []byte) error {

// Write stores the byte array inside content
func (rm *responseModifier) Write(b []byte) (int, error) {

if rm.hijacked {
return rm.rw.Write(b)
}

rm.body = append(rm.body, b...)
return len(b), nil
}
Expand All @@ -109,15 +142,41 @@ func (rm *responseModifier) RawHeaders() ([]byte, error) {

// Hijack returns the internal connection of the wrapped http.ResponseWriter
func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {

rm.hijacked = true
rm.FlushAll()

hijacker, ok := rm.rw.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("Internal reponse writer doesn't support the Hijacker interface")
}
return hijacker.Hijack()
}

// Flush flushes all data to the HTTP response
func (rm *responseModifier) Flush() error {
// CloseNotify uses the internal close notify API of the wrapped http.ResponseWriter
func (rm *responseModifier) CloseNotify() <-chan bool {
closeNotifier, ok := rm.rw.(http.CloseNotifier)
if !ok {
logrus.Errorf("Internal reponse writer doesn't support the CloseNotifier interface")
return nil
}
return closeNotifier.CloseNotify()
}

// Flush uses the internal flush API of the wrapped http.ResponseWriter
func (rm *responseModifier) Flush() {
flusher, ok := rm.rw.(http.Flusher)
if !ok {
logrus.Errorf("Internal reponse writer doesn't support the Flusher interface")
return
}

rm.FlushAll()
flusher.Flush()
}

// FlushAll flushes all data to the HTTP response
func (rm *responseModifier) FlushAll() error {
// Copy the status code
if rm.statusCode > 0 {
rm.rw.WriteHeader(rm.statusCode)
Expand All @@ -130,7 +189,15 @@ func (rm *responseModifier) Flush() error {
}
}

// Write body
_, err := rm.rw.Write(rm.body)
var err error
if len(rm.body) > 0 {
// Write body
_, err = rm.rw.Write(rm.body)
}

// Clean previous data
rm.body = nil
rm.statusCode = 0
rm.header = http.Header{}
return err
}

0 comments on commit 862f073

Please sign in to comment.