Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Variable log uploading #198

Merged
merged 1 commit into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@ import (
"fmt"
"os"
"strings"
"time"
"unicode"

"github.com/Netflix/titus-executor/api/netflix/titus"
"gopkg.in/urfave/cli.v1"
)

const (
defaultLogUploadThreshold = 6 * time.Hour
defaultLogUploadCheckInterval = 15 * time.Minute
defaultStdioLogCheckInterval = 1 * time.Minute
defaultLogsTmpDir = "/var/lib/titus-container-logs"
defaultLogsTmpDir = "/var/lib/titus-container-logs"
)

// Config contains the executor configuration
Expand Down Expand Up @@ -45,11 +41,6 @@ type Config struct {
ContainerSSHDUsers cli.StringSlice
EC2AccountID string

KeepLocalFileAfterUpload bool
LogUploadThresholdTime time.Duration
LogUploadCheckInterval time.Duration
StdioLogCheckInterval time.Duration

// CopiedFromHost indicates which environment variables to lift from the current config
copiedFromHostEnv cli.StringSlice
hardCodedEnv cli.StringSlice
Expand Down Expand Up @@ -162,25 +153,6 @@ func NewConfig() (*Config, []cli.Flag) {
Destination: &cfg.EC2AccountID,
EnvVar: "EC2_OWNER_ID",
},
cli.BoolFlag{
Name: "keep-local-file-after-upload",
Destination: &cfg.KeepLocalFileAfterUpload,
},
cli.DurationFlag{
Name: "log-upload-threshold-time",
Value: defaultLogUploadThreshold,
Destination: &cfg.LogUploadThresholdTime,
},
cli.DurationFlag{
Name: "log-upload-check-interval",
Value: defaultLogUploadCheckInterval,
Destination: &cfg.LogUploadCheckInterval,
},
cli.DurationFlag{
Name: "stdio-check-interval",
Value: defaultStdioLogCheckInterval,
Destination: &cfg.StdioLogCheckInterval,
},
cli.StringSliceFlag{
Name: "copied-from-host-env",
Value: &cfg.copiedFromHostEnv,
Expand Down
4 changes: 1 addition & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ func TestDefaultLogDir(t *testing.T) {
assert.Equal(t, cfg.LogsTmpDir, "/var/lib/titus-container-logs", "Log dir set to unexpected value")
}

func TestDefaultDurations(t *testing.T) {
func TestDefaults(t *testing.T) {
cfg := GetDefaultConfiguration(t, nil)

assert.Equal(t, cfg.Stack, "mainvpc")
assert.Equal(t, cfg.LogUploadThresholdTime, defaultLogUploadThreshold)
assert.Equal(t, cfg.LogUploadCheckInterval, defaultLogUploadCheckInterval)

}

Expand Down
15 changes: 9 additions & 6 deletions executor/mock/jobrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Netflix/titus-executor/executor/drivers"
"github.com/Netflix/titus-executor/executor/runner"
"github.com/Netflix/titus-executor/executor/runtime/docker"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/uploader"
protobuf "github.com/golang/protobuf/proto"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -158,7 +159,6 @@ func GenerateConfigs() (*config.Config, *docker.Config) {
if err != nil {
panic(err)
}
cfg.KeepLocalFileAfterUpload = true
cfg.MetatronEnabled = false

dockerCfg, err := docker.GenerateConfiguration(nil)
Expand Down Expand Up @@ -240,12 +240,15 @@ func (jobRunner *JobRunner) StartJob(jobInput *JobInput) *JobRunResponse { // no
NetworkConfigInfo: &titus.ContainerInfo_NetworkConfigInfo{
EniLabel: protobuf.String("1"),
},
IamProfile: protobuf.String("arn:aws:iam::0:role/DefaultContainerRole"),
Capabilities: jobInput.Capabilities,
TitusProvidedEnv: env,
IgnoreLaunchGuard: protobuf.Bool(jobInput.IgnoreLaunchGuard),
PassthroughAttributes: make(map[string]string),
IamProfile: protobuf.String("arn:aws:iam::0:role/DefaultContainerRole"),
Capabilities: jobInput.Capabilities,
TitusProvidedEnv: env,
IgnoreLaunchGuard: protobuf.Bool(jobInput.IgnoreLaunchGuard),
PassthroughAttributes: map[string]string{
runtimeTypes.LogKeepLocalFileAfterUploadParam: "true",
},
}

if p := jobInput.Process; p != nil {
ci.Process = &titus.ContainerInfo_Process{
Entrypoint: p.Entrypoint,
Expand Down
19 changes: 17 additions & 2 deletions executor/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,26 @@ func (r *Runner) wasKilled() bool {
}

func (r *Runner) maybeSetupExternalLogger(ctx context.Context, logDir string) error {
var err error
logUploadCheckInterval, err := r.container.GetLogStdioCheckInterval()
if err != nil {
return err
}
logUploadThresholdTime, err := r.container.GetLogUploadThresholdTime()
if err != nil {
return err
}
logStdioCheckInterval, err := r.container.GetLogStdioCheckInterval()
if err != nil {
return err
}
keepLocalLogFileAfterUpload, err := r.container.GetKeepLocalFileAfterUpload()
if err != nil {
return err
}

uploadDir := r.container.UploadDir("logs")
uploadRegex := r.container.TitusInfo.GetLogUploadRegexp()
r.watcher, err = filesystems.NewWatcher(r.metrics, logDir, uploadDir, uploadRegex, r.logUploaders, r.config)
r.watcher, err = filesystems.NewWatcher(r.metrics, logDir, uploadDir, uploadRegex, r.logUploaders, logUploadCheckInterval, logUploadThresholdTime, logStdioCheckInterval, keepLocalLogFileAfterUpload)
if err != nil {
return err
}
Expand Down
82 changes: 81 additions & 1 deletion executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import "fmt"

import (
"context"
"errors"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/Netflix/titus-executor/api/netflix/titus"
"github.com/Netflix/titus-executor/config"
Expand All @@ -20,6 +20,7 @@ import (
_ "github.com/Netflix/titus-api-definitions/src/main/proto/netflix/titus"
"github.com/Netflix/titus-executor/executor/dockershellparser"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/pkg/errors"
)

const (
Expand All @@ -30,6 +31,18 @@ const (
ttyEnabledParam = "titusParameter.agent.ttyEnabled"
)

const (
logUploadThresholdTimeParam = "titusParameter.agent.log.uploadThresholdTime"
logUploadCheckIntervalParam = "titusParameter.agent.log.uploadCheckInterval"
logStdioCheckIntervalParam = "titusParameter.agent.log.stdioCheckInterval"
// LogKeepLocalFileAfterUploadParam is the container attribute to specify whether the log file rotator should delete files after uploading
LogKeepLocalFileAfterUploadParam = "titusParameter.agent.log.keepLocalFileAfterUpload"

defaultLogUploadThresholdTime = 6 * time.Hour
defaultLogUploadCheckInterval = 15 * time.Minute
defaultStdioLogCheckInterval = 1 * time.Minute
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
// This is a transition because previously the protobuf had this marked as an optional field
// and it's a temporary measure during protocol evolution.
Expand Down Expand Up @@ -300,6 +313,73 @@ func (c *Container) GetTty() (bool, error) {
return val, nil
}

// GetLogUploadThresholdTime indicates how long since a file was modified before we should upload it and delete it
func (c *Container) GetLogUploadThresholdTime() (time.Duration, error) {
logUploadThresholdTimeStr, ok := c.TitusInfo.GetPassthroughAttributes()[logUploadThresholdTimeParam]
if !ok {
return defaultLogUploadThresholdTime, nil
}
duration, err := time.ParseDuration(logUploadThresholdTimeStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log upload threshold time")
}
// Must be at least 2 * logUploadCheckInterval
logUploadCheckInterval, err := c.GetLogUploadCheckInterval()
if err != nil {
return 0, err
}
if duration <= logUploadCheckInterval*2 {
return 0, fmt.Errorf("Log upload threshold time %s must be at least 2 * %s, the log upload check interval", duration, logUploadCheckInterval)
}
logStdioCheckInterval, err := c.GetLogStdioCheckInterval()
if err != nil {
return 0, err
}
if duration <= logStdioCheckInterval*2 {
return 0, fmt.Errorf("Log upload threshold time %s must be at least 2 * %s, the stdio check interval", duration, logUploadCheckInterval)
}

return duration, nil
}

// GetLogUploadCheckInterval indicates how often we should scan the continers log directory to see if files need to be uploaded
func (c *Container) GetLogUploadCheckInterval() (time.Duration, error) {
logUploadCheckIntervalStr, ok := c.TitusInfo.GetPassthroughAttributes()[logUploadCheckIntervalParam]
if !ok {
return defaultLogUploadCheckInterval, nil
}
duration, err := time.ParseDuration(logUploadCheckIntervalStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log upload check interval")
}
if duration < time.Minute {
return 0, fmt.Errorf("Log upload check interval '%s' must be at least 1 minute", duration)
}
return duration, nil
}

// GetLogStdioCheckInterval indicates how often we should scan the stdio log files to determine whether they should be uploaded
func (c *Container) GetLogStdioCheckInterval() (time.Duration, error) {
logStdioCheckIntervalStr, ok := c.TitusInfo.GetPassthroughAttributes()[logStdioCheckIntervalParam]
if !ok {
return defaultStdioLogCheckInterval, nil
}
duration, err := time.ParseDuration(logStdioCheckIntervalStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log stdio check interval")
}
return duration, nil
}

// GetKeepLocalFileAfterUpload indicates whether or not we should delete log files after uploading them
func (c *Container) GetKeepLocalFileAfterUpload() (bool, error) {
keepLocalFileAfterUploadStr, ok := c.TitusInfo.GetPassthroughAttributes()[LogKeepLocalFileAfterUploadParam]
if !ok {
return false, nil
}
return strconv.ParseBool(keepLocalFileAfterUploadStr)
}

// Resources specify constraints to be applied to a Container
type Resources struct {
Mem int64 // in MiB
Expand Down
21 changes: 10 additions & 11 deletions filesystems/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/Netflix/metrics-client-go/metrics"
"github.com/Netflix/titus-executor/config"
"github.com/Netflix/titus-executor/filesystems/xattr"
"github.com/Netflix/titus-executor/uploader"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -75,8 +74,8 @@ type Watcher struct {
uploaders *uploader.Uploaders
// UploadCheckInterval returns how often files are checked if they need to be uploaded
UploadCheckInterval time.Duration
// UploadThreshold returns how long a file must be untouched prior to uploading it
UploadThreshold time.Duration
// UploadThresholdTime returns how long a file must be untouched prior to uploading it
UploadThresholdTime time.Duration
stdioLogCheckInterval time.Duration
keepLocalFileAfterUpload bool
retError error
Expand All @@ -87,16 +86,16 @@ type Watcher struct {
}

// NewWatcher returns a fully instantiated instance of Watcher, which will run until Stop is called.
func NewWatcher(m metrics.Reporter, localDir, uploadDir, uploadRegexpStr string, uploaders *uploader.Uploaders, cfg config.Config) (*Watcher, error) {
func NewWatcher(m metrics.Reporter, localDir, uploadDir, uploadRegexpStr string, uploaders *uploader.Uploaders, uploadCheckInterval time.Duration, uploadThresholdTime time.Duration, stdioLogCheckInterval time.Duration, keepLocalFileAfterUpload bool) (*Watcher, error) {
watcher := &Watcher{
metrics: m,
localDir: localDir,
uploadDir: uploadDir,
uploaders: uploaders,
UploadCheckInterval: cfg.LogUploadCheckInterval,
UploadThreshold: cfg.LogUploadThresholdTime,
stdioLogCheckInterval: cfg.StdioLogCheckInterval,
keepLocalFileAfterUpload: cfg.KeepLocalFileAfterUpload,
UploadCheckInterval: uploadCheckInterval,
UploadThresholdTime: uploadThresholdTime,
stdioLogCheckInterval: stdioLogCheckInterval,
keepLocalFileAfterUpload: keepLocalFileAfterUpload,
}

if uploadRegexpStr != "" {
Expand Down Expand Up @@ -294,7 +293,7 @@ func (w *Watcher) traditionalRotateLoop(parentCtx context.Context, wg *sync.Wait
// Traditional rotate doesn't actually rotate at all
// it goes through a list of files, and checks when they were modified, and based upon that it uploads them and optionally deletes them
func (w *Watcher) traditionalRotate(ctx context.Context) {
logFileList, err := buildFileListInDir(w.localDir, true, w.UploadThreshold)
logFileList, err := buildFileListInDir(w.localDir, true, w.UploadThresholdTime)
if err == nil {
for _, logFile := range logFileList {
w.uploadLogfile(ctx, logFile)
Expand Down Expand Up @@ -427,7 +426,7 @@ func (w *Watcher) doStdioUploadAndReclaimVirtualFile(ctx context.Context, mode s
now := time.Now()
age := now.Sub(creationTime)

if mode == normalRotate && now.Add(-1*w.UploadThreshold).Before(creationTime) {
if mode == normalRotate && now.Add(-1*w.UploadThresholdTime).Before(creationTime) {
log.Debugf("Virtual file %s of real file %s not old enough to upload and discard because only %s old", virtualFileName, file.Name(), age.String())
return
}
Expand Down Expand Up @@ -587,7 +586,7 @@ func parsecurrentOffsetBytes(name string, currentOffsetBytes []byte) int64 {
func (w *Watcher) uploadAllLogFiles(ctx context.Context) error {
var errs *multierror.Error

logFileList, err := buildFileListInDir(w.localDir, false, w.UploadThreshold)
logFileList, err := buildFileListInDir(w.localDir, false, w.UploadThresholdTime)
if err != nil {
w.metrics.Counter("titus.executor.logsUploadError", 1, nil)
log.Printf("Error uploading directory %s : %s\n", w.localDir, err)
Expand Down
2 changes: 1 addition & 1 deletion filesystems/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeWatcher(localDir, uploadDir string) *Watcher {
uploadDir: uploadDir,
uploadRegexp: nil,
uploaders: uploaders,
UploadThreshold: time.Duration(time.Second * 10),
UploadThresholdTime: time.Duration(time.Second * 10),
UploadCheckInterval: time.Duration(time.Second * 2),
stdioLogCheckInterval: time.Duration(time.Second * 2),
keepLocalFileAfterUpload: false,
Expand Down