Skip to content

Commit

Permalink
Use JSON logging in App Engine Flexible.
Browse files Browse the repository at this point in the history
Change-Id: I8a94e15496b3e111a8167a75a515fc90d23102f6
  • Loading branch information
Adam Tanner committed May 13, 2016
1 parent e234e71 commit 5803211
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 199 deletions.
155 changes: 9 additions & 146 deletions internal/api.go
Expand Up @@ -26,8 +26,6 @@ import (
"github.com/golang/protobuf/proto"
netcontext "golang.org/x/net/context"

basepb "google.golang.org/appengine/internal/base"
logpb "google.golang.org/appengine/internal/log"
remotepb "google.golang.org/appengine/internal/remote_api"
)

Expand All @@ -52,7 +50,6 @@ var (
apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
apiContentType = http.CanonicalHeaderKey("Content-Type")
apiContentTypeValue = []string{"application/octet-stream"}
logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")

apiHTTPClient = &http.Client{
Transport: &http.Transport{
Expand Down Expand Up @@ -82,8 +79,8 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) {
req: r,
outHeader: w.Header(),
apiURL: apiURL(),
logger: globalLogger(),
}
stopFlushing := make(chan int)

ctxs.Lock()
ctxs.m[r] = c
Expand Down Expand Up @@ -112,26 +109,9 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) {
r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
}

// Start goroutine responsible for flushing app logs.
// This is done after adding c to ctx.m (and stopped before removing it)
// because flushing logs requires making an API call.
go c.logFlusher(stopFlushing)

executeRequestSafely(c, r)
c.outHeader = nil // make sure header changes aren't respected any more

stopFlushing <- 1 // any logging beyond this point will be dropped

// Flush any pending logs asynchronously.
c.pendingLogs.Lock()
flushes := c.pendingLogs.flushes
if len(c.pendingLogs.lines) > 0 {
flushes++
}
c.pendingLogs.Unlock()
go c.flushLog(false)
w.Header().Set(logFlushHeader, strconv.Itoa(flushes))

// Avoid nil Write call if c.Write is never called.
if c.outCode != 0 {
w.WriteHeader(c.outCode)
Expand Down Expand Up @@ -206,18 +186,13 @@ var ctxs = struct {
// context represents the context of an in-flight HTTP request.
// It implements the appengine.Context and http.ResponseWriter interfaces.
type context struct {
req *http.Request
req *http.Request
logger *jsonLogger

outCode int
outHeader http.Header
outBody []byte

pendingLogs struct {
sync.Mutex
lines []*logpb.UserAppLogLine
flushes int
}

apiURL *url.URL
}

Expand Down Expand Up @@ -290,11 +265,9 @@ func BackgroundContext() netcontext.Context {
},
},
apiURL: apiURL(),
logger: globalLogger(),
}

// TODO(dsymonds): Wire up the shutdown handler to do a final flush.
go ctxs.bg.logFlusher(make(chan int))

return toContext(ctxs.bg)
}

Expand All @@ -306,6 +279,7 @@ func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netco
c := &context{
req: req,
apiURL: apiURL,
logger: globalLogger(),
}
ctxs.Lock()
defer ctxs.Unlock()
Expand Down Expand Up @@ -521,120 +495,9 @@ func (c *context) Request() *http.Request {
return c.req
}

func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
// Truncate long log lines.
// TODO(dsymonds): Check if this is still necessary.
const lim = 8 << 10
if len(*ll.Message) > lim {
suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
}

c.pendingLogs.Lock()
c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
c.pendingLogs.Unlock()
}

var logLevelName = map[int64]string{
0: "DEBUG",
1: "INFO",
2: "WARNING",
3: "ERROR",
4: "CRITICAL",
}

func logf(c *context, level int64, format string, args ...interface{}) {
s := fmt.Sprintf(format, args...)
s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
c.addLogLine(&logpb.UserAppLogLine{
TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
Level: &level,
Message: &s,
})
log.Print(logLevelName[level] + ": " + s)
}

// flushLog attempts to flush any pending logs to the appserver.
// It should not be called concurrently.
func (c *context) flushLog(force bool) (flushed bool) {
c.pendingLogs.Lock()
// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
n, rem := 0, 30<<20
for ; n < len(c.pendingLogs.lines); n++ {
ll := c.pendingLogs.lines[n]
// Each log line will require about 3 bytes of overhead.
nb := proto.Size(ll) + 3
if nb > rem {
break
}
rem -= nb
}
lines := c.pendingLogs.lines[:n]
c.pendingLogs.lines = c.pendingLogs.lines[n:]
c.pendingLogs.Unlock()

if len(lines) == 0 && !force {
// Nothing to flush.
return false
}

rescueLogs := false
defer func() {
if rescueLogs {
c.pendingLogs.Lock()
c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
c.pendingLogs.Unlock()
}
}()

buf, err := proto.Marshal(&logpb.UserAppLogGroup{
LogLine: lines,
})
if err != nil {
log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
rescueLogs = true
return false
}

req := &logpb.FlushRequest{
Logs: buf,
}
res := &basepb.VoidProto{}
c.pendingLogs.Lock()
c.pendingLogs.flushes++
c.pendingLogs.Unlock()
if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
log.Printf("internal.flushLog: Flush RPC: %v", err)
rescueLogs = true
return false
}
return true
}

const (
// Log flushing parameters.
flushInterval = 1 * time.Second
forceFlushInterval = 60 * time.Second
)

func (c *context) logFlusher(stop <-chan int) {
lastFlush := time.Now()
tick := time.NewTicker(flushInterval)
for {
select {
case <-stop:
// Request finished.
tick.Stop()
return
case <-tick.C:
force := time.Now().Sub(lastFlush) > forceFlushInterval
if c.flushLog(force) {
lastFlush = time.Now()
}
}
}
}

func ContextForTesting(req *http.Request) netcontext.Context {
return toContext(&context{req: req})
return toContext(&context{
req: req,
logger: discardLogger,
})
}
98 changes: 45 additions & 53 deletions internal/api_test.go
Expand Up @@ -9,6 +9,7 @@ package internal
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,7 +19,6 @@ import (
"os"
"os/exec"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -37,8 +37,6 @@ func init() {

type fakeAPIHandler struct {
hang chan int // used for RunSlowly RPC

LogFlushes int32 // atomic
}

func (f *fakeAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -124,12 +122,6 @@ func (f *fakeAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resOut = &basepb.VoidProto{}
}
}
if service == "logservice" && method == "Flush" {
// Pretend log flushing is slow.
time.Sleep(50 * time.Millisecond)
atomic.AddInt32(&f.LogFlushes, 1)
resOut = &basepb.VoidProto{}
}

encOut, err := proto.Marshal(resOut)
if err != nil {
Expand Down Expand Up @@ -226,50 +218,6 @@ func TestAPICallDialFailure(t *testing.T) {
}
}

func TestDelayedLogFlushing(t *testing.T) {
f, c, cleanup := setup()
defer cleanup()

http.HandleFunc("/quick_log", func(w http.ResponseWriter, r *http.Request) {
logC := WithContext(netcontext.Background(), r)
fromContext(logC).apiURL = c.apiURL // Otherwise it will try to use the default URL.
Logf(logC, 1, "It's a lovely day.")
w.WriteHeader(200)
w.Write(make([]byte, 100<<10)) // write 100 KB to force HTTP flush
})

r := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Path: "/quick_log",
},
Header: c.req.Header,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
w := httptest.NewRecorder()

// Check that log flushing does not hold up the HTTP response.
start := time.Now()
handleHTTP(w, r)
if d := time.Since(start); d > 10*time.Millisecond {
t.Errorf("handleHTTP took %v, want under 10ms", d)
}
const hdr = "X-AppEngine-Log-Flush-Count"
if h := w.HeaderMap.Get(hdr); h != "1" {
t.Errorf("%s header = %q, want %q", hdr, h, "1")
}
if f := atomic.LoadInt32(&f.LogFlushes); f != 0 {
t.Errorf("After HTTP response: f.LogFlushes = %d, want 0", f)
}

// Check that the log flush eventually comes in.
time.Sleep(100 * time.Millisecond)
if f := atomic.LoadInt32(&f.LogFlushes); f != 1 {
t.Errorf("After 100ms: f.LogFlushes = %d, want 1", f)
}
}

func TestRemoteAddr(t *testing.T) {
var addr string
http.HandleFunc("/remote_addr", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -324,6 +272,50 @@ func TestPanickingHandler(t *testing.T) {
}
}

func TestDelayedLogging(t *testing.T) {
_, c, cleanup := setup()
defer cleanup()

buf := bytes.NewBuffer(nil)
donec := make(chan bool, 1)

http.HandleFunc("/logging", func(w http.ResponseWriter, r *http.Request) {
logC := WithContext(netcontext.Background(), r)
fromContext(logC).logger = newJSONLogger(buf)

time.AfterFunc(200*time.Millisecond, func() {
Logf(logC, 1, "It's a lovely day.")
donec <- true
})

w.WriteHeader(200)
})

r := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Path: "/logging",
},
Header: c.req.Header,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
w := httptest.NewRecorder()

handleHTTP(w, r)

<-donec

var line logLine
if err := json.NewDecoder(buf).Decode(&line); err != nil {
t.Fatalf("Failed to unmarshal log line: %v", err)
}

if got, want := line.Message, "It's a lovely day."; got != want {
t.Errorf("line.Message = %s, want %s", got, want)
}
}

var raceDetector = false

func TestAPICallAllocations(t *testing.T) {
Expand Down

0 comments on commit 5803211

Please sign in to comment.