Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Log Uploader: Remove dependency on config
Browse files Browse the repository at this point in the history
 Make it so that the log uploader can be configured from the container attributes
  • Loading branch information
sargun committed Nov 14, 2018
1 parent 5faff49 commit 85600c8
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 53 deletions.
30 changes: 1 addition & 29 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@ import (
"fmt"
"os"
"strings"
"time"
"unicode"

"github.com/Netflix/titus-executor/api/netflix/titus"
"gopkg.in/urfave/cli.v1"
)

const (
defaultLogUploadThreshold = 6 * time.Hour
defaultLogUploadCheckInterval = 15 * time.Minute
defaultStdioLogCheckInterval = 1 * time.Minute
defaultLogsTmpDir = "/var/lib/titus-container-logs"
defaultLogsTmpDir = "/var/lib/titus-container-logs"
)

// Config contains the executor configuration
Expand Down Expand Up @@ -45,11 +41,6 @@ type Config struct {
ContainerSSHDUsers cli.StringSlice
EC2AccountID string

KeepLocalFileAfterUpload bool
LogUploadThresholdTime time.Duration
LogUploadCheckInterval time.Duration
StdioLogCheckInterval time.Duration

// CopiedFromHost indicates which environment variables to lift from the current config
copiedFromHostEnv cli.StringSlice
hardCodedEnv cli.StringSlice
Expand Down Expand Up @@ -162,25 +153,6 @@ func NewConfig() (*Config, []cli.Flag) {
Destination: &cfg.EC2AccountID,
EnvVar: "EC2_OWNER_ID",
},
cli.BoolFlag{
Name: "keep-local-file-after-upload",
Destination: &cfg.KeepLocalFileAfterUpload,
},
cli.DurationFlag{
Name: "log-upload-threshold-time",
Value: defaultLogUploadThreshold,
Destination: &cfg.LogUploadThresholdTime,
},
cli.DurationFlag{
Name: "log-upload-check-interval",
Value: defaultLogUploadCheckInterval,
Destination: &cfg.LogUploadCheckInterval,
},
cli.DurationFlag{
Name: "stdio-check-interval",
Value: defaultStdioLogCheckInterval,
Destination: &cfg.StdioLogCheckInterval,
},
cli.StringSliceFlag{
Name: "copied-from-host-env",
Value: &cfg.copiedFromHostEnv,
Expand Down
4 changes: 1 addition & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ func TestDefaultLogDir(t *testing.T) {
assert.Equal(t, cfg.LogsTmpDir, "/var/lib/titus-container-logs", "Log dir set to unexpected value")
}

func TestDefaultDurations(t *testing.T) {
func TestDefaults(t *testing.T) {
cfg := GetDefaultConfiguration(t, nil)

assert.Equal(t, cfg.Stack, "mainvpc")
assert.Equal(t, cfg.LogUploadThresholdTime, defaultLogUploadThreshold)
assert.Equal(t, cfg.LogUploadCheckInterval, defaultLogUploadCheckInterval)

}

Expand Down
15 changes: 9 additions & 6 deletions executor/mock/jobrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Netflix/titus-executor/executor/drivers"
"github.com/Netflix/titus-executor/executor/runner"
"github.com/Netflix/titus-executor/executor/runtime/docker"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/uploader"
protobuf "github.com/golang/protobuf/proto"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -158,7 +159,6 @@ func GenerateConfigs() (*config.Config, *docker.Config) {
if err != nil {
panic(err)
}
cfg.KeepLocalFileAfterUpload = true
cfg.MetatronEnabled = false

dockerCfg, err := docker.GenerateConfiguration(nil)
Expand Down Expand Up @@ -240,12 +240,15 @@ func (jobRunner *JobRunner) StartJob(jobInput *JobInput) *JobRunResponse { // no
NetworkConfigInfo: &titus.ContainerInfo_NetworkConfigInfo{
EniLabel: protobuf.String("1"),
},
IamProfile: protobuf.String("arn:aws:iam::0:role/DefaultContainerRole"),
Capabilities: jobInput.Capabilities,
TitusProvidedEnv: env,
IgnoreLaunchGuard: protobuf.Bool(jobInput.IgnoreLaunchGuard),
PassthroughAttributes: make(map[string]string),
IamProfile: protobuf.String("arn:aws:iam::0:role/DefaultContainerRole"),
Capabilities: jobInput.Capabilities,
TitusProvidedEnv: env,
IgnoreLaunchGuard: protobuf.Bool(jobInput.IgnoreLaunchGuard),
PassthroughAttributes: map[string]string{
runtimeTypes.LogKeepLocalFileAfterUploadParam: "true",
},
}

if p := jobInput.Process; p != nil {
ci.Process = &titus.ContainerInfo_Process{
Entrypoint: p.Entrypoint,
Expand Down
19 changes: 17 additions & 2 deletions executor/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,26 @@ func (r *Runner) wasKilled() bool {
}

func (r *Runner) maybeSetupExternalLogger(ctx context.Context, logDir string) error {
var err error
logUploadCheckInterval, err := r.container.GetLogStdioCheckInterval()
if err != nil {
return err
}
logUploadThresholdTime, err := r.container.GetLogUploadThresholdTime()
if err != nil {
return err
}
logStdioCheckInterval, err := r.container.GetLogStdioCheckInterval()
if err != nil {
return err
}
keepLocalLogFileAfterUpload, err := r.container.GetKeepLocalFileAfterUpload()
if err != nil {
return err
}

uploadDir := r.container.UploadDir("logs")
uploadRegex := r.container.TitusInfo.GetLogUploadRegexp()
r.watcher, err = filesystems.NewWatcher(r.metrics, logDir, uploadDir, uploadRegex, r.logUploaders, r.config)
r.watcher, err = filesystems.NewWatcher(r.metrics, logDir, uploadDir, uploadRegex, r.logUploaders, logUploadCheckInterval, logUploadThresholdTime, logStdioCheckInterval, keepLocalLogFileAfterUpload)
if err != nil {
return err
}
Expand Down
82 changes: 81 additions & 1 deletion executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import "fmt"

import (
"context"
"errors"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/Netflix/titus-executor/api/netflix/titus"
"github.com/Netflix/titus-executor/config"
Expand All @@ -20,6 +20,7 @@ import (
_ "github.com/Netflix/titus-api-definitions/src/main/proto/netflix/titus"
"github.com/Netflix/titus-executor/executor/dockershellparser"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/pkg/errors"
)

const (
Expand All @@ -30,6 +31,18 @@ const (
ttyEnabledParam = "titusParameter.agent.ttyEnabled"
)

const (
logUploadThresholdTimeParam = "titusParameter.agent.log.uploadThresholdTime"
logUploadCheckIntervalParam = "titusParameter.agent.log.uploadCheckInterval"
logStdioCheckIntervalParam = "titusParameter.agent.log.stdioCheckInterval"
// LogKeepLocalFileAfterUploadParam is the container attribute to specify whether the log file rotator should delete files after uploading
LogKeepLocalFileAfterUploadParam = "titusParameter.agent.log.keepLocalFileAfterUpload"

defaultLogUploadThresholdTime = 6 * time.Hour
defaultLogUploadCheckInterval = 15 * time.Minute
defaultStdioLogCheckInterval = 1 * time.Minute
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
// This is a transition because previously the protobuf had this marked as an optional field
// and it's a temporary measure during protocol evolution.
Expand Down Expand Up @@ -300,6 +313,73 @@ func (c *Container) GetTty() (bool, error) {
return val, nil
}

// GetLogUploadThresholdTime indicates how long since a file was modified before we should upload it and delete it
func (c *Container) GetLogUploadThresholdTime() (time.Duration, error) {
logUploadThresholdTimeStr, ok := c.TitusInfo.GetPassthroughAttributes()[logUploadThresholdTimeParam]
if !ok {
return defaultLogUploadThresholdTime, nil
}
duration, err := time.ParseDuration(logUploadThresholdTimeStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log upload threshold time")
}
// Must be at least 2 * logUploadCheckInterval
logUploadCheckInterval, err := c.GetLogUploadCheckInterval()
if err != nil {
return 0, err
}
if duration <= logUploadCheckInterval*2 {
return 0, fmt.Errorf("Log upload threshold time %s must be at least 2 * %s, the log upload check interval", duration, logUploadCheckInterval)
}
logStdioCheckInterval, err := c.GetLogStdioCheckInterval()
if err != nil {
return 0, err
}
if duration <= logStdioCheckInterval*2 {
return 0, fmt.Errorf("Log upload threshold time %s must be at least 2 * %s, the stdio check interval", duration, logUploadCheckInterval)
}

return duration, nil
}

// GetLogUploadCheckInterval indicates how often we should scan the continers log directory to see if files need to be uploaded
func (c *Container) GetLogUploadCheckInterval() (time.Duration, error) {
logUploadCheckIntervalStr, ok := c.TitusInfo.GetPassthroughAttributes()[logUploadCheckIntervalParam]
if !ok {
return defaultLogUploadCheckInterval, nil
}
duration, err := time.ParseDuration(logUploadCheckIntervalStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log upload check interval")
}
if duration < time.Minute {
return 0, fmt.Errorf("Log upload check interval '%s' must be at least 1 minute", duration)
}
return duration, nil
}

// GetLogStdioCheckInterval indicates how often we should scan the stdio log files to determine whether they should be uploaded
func (c *Container) GetLogStdioCheckInterval() (time.Duration, error) {
logStdioCheckIntervalStr, ok := c.TitusInfo.GetPassthroughAttributes()[logStdioCheckIntervalParam]
if !ok {
return defaultStdioLogCheckInterval, nil
}
duration, err := time.ParseDuration(logStdioCheckIntervalStr)
if err != nil {
return 0, errors.Wrap(err, "Cannot parse log stdio check interval")
}
return duration, nil
}

// GetKeepLocalFileAfterUpload indicates whether or not we should delete log files after uploading them
func (c *Container) GetKeepLocalFileAfterUpload() (bool, error) {
keepLocalFileAfterUploadStr, ok := c.TitusInfo.GetPassthroughAttributes()[LogKeepLocalFileAfterUploadParam]
if !ok {
return false, nil
}
return strconv.ParseBool(keepLocalFileAfterUploadStr)
}

// Resources specify constraints to be applied to a Container
type Resources struct {
Mem int64 // in MiB
Expand Down
21 changes: 10 additions & 11 deletions filesystems/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/Netflix/metrics-client-go/metrics"
"github.com/Netflix/titus-executor/config"
"github.com/Netflix/titus-executor/filesystems/xattr"
"github.com/Netflix/titus-executor/uploader"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -75,8 +74,8 @@ type Watcher struct {
uploaders *uploader.Uploaders
// UploadCheckInterval returns how often files are checked if they need to be uploaded
UploadCheckInterval time.Duration
// UploadThreshold returns how long a file must be untouched prior to uploading it
UploadThreshold time.Duration
// UploadThresholdTime returns how long a file must be untouched prior to uploading it
UploadThresholdTime time.Duration
stdioLogCheckInterval time.Duration
keepLocalFileAfterUpload bool
retError error
Expand All @@ -87,16 +86,16 @@ type Watcher struct {
}

// NewWatcher returns a fully instantiated instance of Watcher, which will run until Stop is called.
func NewWatcher(m metrics.Reporter, localDir, uploadDir, uploadRegexpStr string, uploaders *uploader.Uploaders, cfg config.Config) (*Watcher, error) {
func NewWatcher(m metrics.Reporter, localDir, uploadDir, uploadRegexpStr string, uploaders *uploader.Uploaders, uploadCheckInterval time.Duration, uploadThresholdTime time.Duration, stdioLogCheckInterval time.Duration, keepLocalFileAfterUpload bool) (*Watcher, error) {
watcher := &Watcher{
metrics: m,
localDir: localDir,
uploadDir: uploadDir,
uploaders: uploaders,
UploadCheckInterval: cfg.LogUploadCheckInterval,
UploadThreshold: cfg.LogUploadThresholdTime,
stdioLogCheckInterval: cfg.StdioLogCheckInterval,
keepLocalFileAfterUpload: cfg.KeepLocalFileAfterUpload,
UploadCheckInterval: uploadCheckInterval,
UploadThresholdTime: uploadThresholdTime,
stdioLogCheckInterval: stdioLogCheckInterval,
keepLocalFileAfterUpload: keepLocalFileAfterUpload,
}

if uploadRegexpStr != "" {
Expand Down Expand Up @@ -294,7 +293,7 @@ func (w *Watcher) traditionalRotateLoop(parentCtx context.Context, wg *sync.Wait
// Traditional rotate doesn't actually rotate at all
// it goes through a list of files, and checks when they were modified, and based upon that it uploads them and optionally deletes them
func (w *Watcher) traditionalRotate(ctx context.Context) {
logFileList, err := buildFileListInDir(w.localDir, true, w.UploadThreshold)
logFileList, err := buildFileListInDir(w.localDir, true, w.UploadThresholdTime)
if err == nil {
for _, logFile := range logFileList {
w.uploadLogfile(ctx, logFile)
Expand Down Expand Up @@ -427,7 +426,7 @@ func (w *Watcher) doStdioUploadAndReclaimVirtualFile(ctx context.Context, mode s
now := time.Now()
age := now.Sub(creationTime)

if mode == normalRotate && now.Add(-1*w.UploadThreshold).Before(creationTime) {
if mode == normalRotate && now.Add(-1*w.UploadThresholdTime).Before(creationTime) {
log.Debugf("Virtual file %s of real file %s not old enough to upload and discard because only %s old", virtualFileName, file.Name(), age.String())
return
}
Expand Down Expand Up @@ -587,7 +586,7 @@ func parsecurrentOffsetBytes(name string, currentOffsetBytes []byte) int64 {
func (w *Watcher) uploadAllLogFiles(ctx context.Context) error {
var errs *multierror.Error

logFileList, err := buildFileListInDir(w.localDir, false, w.UploadThreshold)
logFileList, err := buildFileListInDir(w.localDir, false, w.UploadThresholdTime)
if err != nil {
w.metrics.Counter("titus.executor.logsUploadError", 1, nil)
log.Printf("Error uploading directory %s : %s\n", w.localDir, err)
Expand Down
2 changes: 1 addition & 1 deletion filesystems/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func makeWatcher(localDir, uploadDir string) *Watcher {
uploadDir: uploadDir,
uploadRegexp: nil,
uploaders: uploaders,
UploadThreshold: time.Duration(time.Second * 10),
UploadThresholdTime: time.Duration(time.Second * 10),
UploadCheckInterval: time.Duration(time.Second * 2),
stdioLogCheckInterval: time.Duration(time.Second * 2),
keepLocalFileAfterUpload: false,
Expand Down

0 comments on commit 85600c8

Please sign in to comment.