Skip to content

Commit

Permalink
fix: tolerate non-json lines in provisionerd logs (#5006)
Browse files Browse the repository at this point in the history
Co-authored-by: Dean Sheather <dean@deansheather.com>
  • Loading branch information
coadler and deansheather committed Nov 10, 2022
1 parent a25deb9 commit 8c8344c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 77 deletions.
103 changes: 64 additions & 39 deletions provisioner/terraform/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
tfjson "github.com/hashicorp/terraform-json"
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/coder/provisionersdk/proto"
)

Expand Down Expand Up @@ -171,10 +172,12 @@ func versionFromBinaryPath(ctx context.Context, binaryPath string) (*version.Ver
return version.NewVersion(vj.Version)
}

func (e executor) init(ctx, killCtx context.Context, logr logger) error {
func (e executor) init(ctx, killCtx context.Context, logr logSink) error {
outWriter, doneOut := logWriter(logr, proto.LogLevel_DEBUG)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut
<-doneErr
}()
Expand All @@ -201,7 +204,7 @@ func (e executor) init(ctx, killCtx context.Context, logr logger) error {
}

// revive:disable-next-line:flag-parameter
func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool) (*proto.Provision_Response, error) {
func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool) (*proto.Provision_Response, error) {
planfilePath := filepath.Join(e.workdir, "terraform.tfplan")
args := []string{
"plan",
Expand All @@ -221,6 +224,8 @@ func (e executor) plan(ctx, killCtx context.Context, env, vars []string, logr lo
outWriter, doneOut := provisionLogWriter(logr)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut
<-doneErr
}()
Expand Down Expand Up @@ -287,7 +292,7 @@ func (e executor) graph(ctx, killCtx context.Context) (string, error) {
}

// revive:disable-next-line:flag-parameter
func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logger, destroy bool,
func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr logSink, destroy bool,
) (*proto.Provision_Response, error) {
args := []string{
"apply",
Expand All @@ -307,6 +312,8 @@ func (e executor) apply(ctx, killCtx context.Context, env, vars []string, logr l
outWriter, doneOut := provisionLogWriter(logr)
errWriter, doneErr := logWriter(logr, proto.LogLevel_ERROR)
defer func() {
_ = outWriter.Close()
_ = errWriter.Close()
<-doneOut
<-doneErr
}()
Expand Down Expand Up @@ -380,86 +387,104 @@ func interruptCommandOnCancel(ctx, killCtx context.Context, cmd *exec.Cmd) {
}()
}

type logger interface {
Log(*proto.Log) error
type logSink interface {
Log(*proto.Log)
}

type streamLogger struct {
type streamLogSink struct {
// Any errors writing to the stream will be logged to logger.
logger slog.Logger
stream proto.DRPCProvisioner_ProvisionStream
}

func (s streamLogger) Log(l *proto.Log) error {
return s.stream.Send(&proto.Provision_Response{
var _ logSink = streamLogSink{}

func (s streamLogSink) Log(l *proto.Log) {
err := s.stream.Send(&proto.Provision_Response{
Type: &proto.Provision_Response_Log{
Log: l,
},
})
if err != nil {
s.logger.Warn(context.Background(), "write log to stream",
slog.F("level", l.Level.String()),
slog.F("message", l.Output),
slog.Error(err),
)
}
}

// logWriter creates a WriteCloser that will log each line of text at the given level. The WriteCloser must be closed
// by the caller to end logging, after which the returned channel will be closed to indicate that logging of the written
// data has finished. Failure to close the WriteCloser will leak a goroutine.
func logWriter(logr logger, level proto.LogLevel) (io.WriteCloser, <-chan any) {
func logWriter(sink logSink, level proto.LogLevel) (io.WriteCloser, <-chan any) {
r, w := io.Pipe()
done := make(chan any)
go readAndLog(logr, r, done, level)
go readAndLog(sink, r, done, level)
return w, done
}

func readAndLog(logr logger, r io.Reader, done chan<- any, level proto.LogLevel) {
func readAndLog(sink logSink, r io.Reader, done chan<- any, level proto.LogLevel) {
defer close(done)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
err := logr.Log(&proto.Log{Level: level, Output: scanner.Text()})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
}
sink.Log(&proto.Log{Level: level, Output: scanner.Text()})
}
}

// provisionLogWriter creates a WriteCloser that will log each JSON formatted terraform log. The WriteCloser must be
// closed by the caller to end logging, after which the returned channel will be closed to indicate that logging of the
// written data has finished. Failure to close the WriteCloser will leak a goroutine.
func provisionLogWriter(logr logger) (io.WriteCloser, <-chan any) {
func provisionLogWriter(sink logSink) (io.WriteCloser, <-chan any) {
r, w := io.Pipe()
done := make(chan any)
go provisionReadAndLog(logr, r, done)
go provisionReadAndLog(sink, r, done)
return w, done
}

func provisionReadAndLog(logr logger, reader io.Reader, done chan<- any) {
func provisionReadAndLog(sink logSink, r io.Reader, done chan<- any) {
defer close(done)
decoder := json.NewDecoder(reader)
for {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
var log terraformProvisionLog
err := decoder.Decode(&log)
err := json.Unmarshal(scanner.Bytes(), &log)
if err != nil {
return
// Sometimes terraform doesn't log JSON, even though we asked it to.
// The terraform maintainers have said on the issue tracker that
// they don't guarantee that non-JSON lines won't get printed.
// https://github.com/hashicorp/terraform/issues/29252#issuecomment-887710001
//
// > I think as a practical matter it isn't possible for us to
// > promise that the output will always be entirely JSON, because
// > there's plenty of code that runs before command line arguments
// > are parsed and thus before we even know we're in JSON mode.
// > Given that, I'd suggest writing code that consumes streaming
// > JSON output from Terraform in such a way that it can tolerate
// > the output not having JSON in it at all.
//
// Log lines such as:
// - Acquiring state lock. This may take a few moments...
// - Releasing state lock. This may take a few moments...
if strings.TrimSpace(scanner.Text()) == "" {
continue
}
log.Level = "info"
log.Message = scanner.Text()
}
logLevel := convertTerraformLogLevel(log.Level, logr)

err = logr.Log(&proto.Log{Level: logLevel, Output: log.Message})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
}
logLevel := convertTerraformLogLevel(log.Level, sink)
sink.Log(&proto.Log{Level: logLevel, Output: log.Message})

// If the diagnostic is provided, let's provide a bit more info!
if log.Diagnostic == nil {
continue
}

// If the diagnostic is provided, let's provide a bit more info!
logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, logr)
err = logr.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail})
if err != nil {
// Not much we can do. We can't log because logging is itself breaking!
return
}
logLevel = convertTerraformLogLevel(log.Diagnostic.Severity, sink)
sink.Log(&proto.Log{Level: logLevel, Output: log.Diagnostic.Detail})
}
}

func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel {
func convertTerraformLogLevel(logLevel string, sink logSink) proto.LogLevel {
switch strings.ToLower(logLevel) {
case "trace":
return proto.LogLevel_TRACE
Expand All @@ -472,7 +497,7 @@ func convertTerraformLogLevel(logLevel string, logr logger) proto.LogLevel {
case "error":
return proto.LogLevel_ERROR
default:
_ = logr.Log(&proto.Log{
sink.Log(&proto.Log{
Level: proto.LogLevel_WARN,
Output: fmt.Sprintf("unable to convert log level %s", logLevel),
})
Expand Down
31 changes: 6 additions & 25 deletions provisioner/terraform/executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@ import (
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/coder/coder/provisionersdk/proto"
)

type mockLogger struct {
logs []*proto.Log
retVal error
logs []*proto.Log
}

func (m *mockLogger) Log(l *proto.Log) error {
var _ logSink = &mockLogger{}

func (m *mockLogger) Log(l *proto.Log) {
m.logs = append(m.logs, l)
return m.retVal
}

func TestLogWriter_Mainline(t *testing.T) {
t.Parallel()

logr := &mockLogger{retVal: nil}
logr := &mockLogger{}
writer, doneLogging := logWriter(logr, proto.LogLevel_INFO)

_, err := writer.Write([]byte(`Sitting in an English garden
Expand All @@ -40,23 +39,5 @@ From standing in the English rain`))
{Level: proto.LogLevel_INFO, Output: "If the sun don't come you get a tan"},
{Level: proto.LogLevel_INFO, Output: "From standing in the English rain"},
}
require.Equal(t, logr.logs, expected)
}

func TestLogWriter_SendError(t *testing.T) {
t.Parallel()

logr := &mockLogger{retVal: xerrors.New("Goo goo g'joob")}
writer, doneLogging := logWriter(logr, proto.LogLevel_INFO)

_, err := writer.Write([]byte(`Sitting in an English garden
Waiting for the sun
If the sun don't come you get a tan
From standing in the English rain`))
require.NoError(t, err)
err = writer.Close()
require.NoError(t, err)
<-doneLogging
expected := []*proto.Log{{Level: proto.LogLevel_INFO, Output: "Sitting in an English garden"}}
require.Equal(t, logr.logs, expected)
require.Equal(t, expected, logr.logs)
}
23 changes: 10 additions & 13 deletions provisioner/terraform/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
}
}()

logr := streamLogger{stream: stream}
sink := streamLogSink{
logger: s.logger.Named("execution_logs"),
stream: stream,
}
start := request.GetStart()

e := s.executor(start.Directory)
if err = e.checkMinVersion(ctx); err != nil {
return err
}
if err = logTerraformEnvVars(logr); err != nil {
return err
}
logTerraformEnvVars(sink)

statefilePath := filepath.Join(start.Directory, "terraform.tfstate")
if len(start.State) > 0 {
Expand Down Expand Up @@ -111,7 +112,7 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
}

s.logger.Debug(ctx, "running initialization")
err = e.init(ctx, killCtx, logr)
err = e.init(ctx, killCtx, sink)
if err != nil {
if ctx.Err() != nil {
return stream.Send(&proto.Provision_Response{
Expand All @@ -136,10 +137,10 @@ func (s *server) Provision(stream proto.DRPCProvisioner_ProvisionStream) error {
}
var resp *proto.Provision_Response
if start.DryRun {
resp, err = e.plan(ctx, killCtx, env, vars, logr,
resp, err = e.plan(ctx, killCtx, env, vars, sink,
start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY)
} else {
resp, err = e.apply(ctx, killCtx, env, vars, logr,
resp, err = e.apply(ctx, killCtx, env, vars, sink,
start.Metadata.WorkspaceTransition == proto.WorkspaceTransition_DESTROY)
}
if err != nil {
Expand Down Expand Up @@ -231,7 +232,7 @@ var (
}
)

func logTerraformEnvVars(logr logger) error {
func logTerraformEnvVars(sink logSink) {
env := safeEnviron()
for _, e := range env {
if strings.HasPrefix(e, "TF_") {
Expand All @@ -242,14 +243,10 @@ func logTerraformEnvVars(logr logger) error {
if !tfEnvSafeToPrint[parts[0]] {
parts[1] = "<value redacted>"
}
err := logr.Log(&proto.Log{
sink.Log(&proto.Log{
Level: proto.LogLevel_WARN,
Output: fmt.Sprintf("terraform environment variable: %s=%s", parts[0], parts[1]),
})
if err != nil {
return err
}
}
}
return nil
}

0 comments on commit 8c8344c

Please sign in to comment.