Skip to content

Commit

Permalink
Introduce Template-based task logging plugin (flyteorg#144)
Browse files Browse the repository at this point in the history
* New task log interface and template plugin

* Update spark and pytorch plugins

* fix unit tests

* Make tests deterministic

* PR Comments

* unit test sorted

* Ensure deterministic order of task logs

* Use mixed logs for driver pod

* PR Comments
  • Loading branch information
EngHabu committed Jan 20, 2021
1 parent 374cb60 commit f0b3882
Show file tree
Hide file tree
Showing 15 changed files with 2,064 additions and 155 deletions.
42 changes: 32 additions & 10 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,43 @@
package logs

import "github.com/lyft/flyteplugins/go/tasks/config"
import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteplugins/go/tasks/config"
)

//go:generate pflags LogConfig

// A URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// Log plugins configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
// Deprecated: Please use KubernetesTemplateURI
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
// Deprecated: Please use StackDriverTemplateURI
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
// Deprecated: Please use StackDriverTemplateURI
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"`
}

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
}

var (
Expand Down
3 changes: 3 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions flyteplugins/go/tasks/logs/logconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 93 additions & 26 deletions flyteplugins/go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,28 @@ package logs

import (
"context"
"fmt"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/tasklog"

logUtils "github.com/lyft/flyteidl/clients/go/coreutils/logs"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/api/core/v1"
)

func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
var logs []*core.TaskLog
logConfig := GetLogConfig()

logPlugins := map[string]logUtils.LogPlugin{}
type logPlugin struct {
Name string
Plugin tasklog.Plugin
}

if logConfig.IsKubernetesEnabled {
logPlugins["Kubernetes Logs"] = logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL)
}
if logConfig.IsCloudwatchEnabled {
logPlugins["Cloudwatch Logs"] = logUtils.NewCloudwatchLogPlugin(logConfig.CloudwatchRegion, logConfig.CloudwatchLogGroup)
}
if logConfig.IsStackDriverEnabled {
logPlugins["Stackdriver Logs"] = logUtils.NewStackdriverLogPlugin(logConfig.GCPProjectName, logConfig.StackdriverLogResourceName)
// Internal
func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
logPlugin, err := InitializeLogPlugins(GetLogConfig())
if err != nil {
return nil, err
}

if len(logPlugins) == 0 {
if logPlugin == nil {
return nil, nil
}

Expand All @@ -44,18 +42,87 @@ func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, na
return nil, nil
}

for name, plugin := range logPlugins {
log, err := plugin.GetTaskLog(
pod.Name,
pod.Namespace,
pod.Spec.Containers[index].Name,
pod.Status.ContainerStatuses[index].ContainerID,
name+nameSuffix,
)
logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
},
)

if err != nil {
return nil, err
}

return logs.TaskLogs, nil
}

type taskLogPluginWrapper struct {
logPlugins []logPlugin
}

func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) {
logs := make([]*core.TaskLog, 0, len(t.logPlugins))
suffix := input.LogName
for _, plugin := range t.logPlugins {
input.LogName = plugin.Name + suffix
o, err := plugin.Plugin.GetTaskLogs(input)
if err != nil {
return nil, err
return tasklog.Output{}, err
}

logs = append(logs, o.TaskLogs...)
}

return tasklog.Output{
TaskLogs: logs,
}, nil
}

// Internal
func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) {
// Use a list to maintain order.
logPlugins := make([]logPlugin, 0, 2)

if cfg.IsKubernetesEnabled {
if len(cfg.KubernetesTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)})
}
}

if cfg.IsCloudwatchEnabled {
if len(cfg.CloudwatchTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)})
}
}

if cfg.IsStackDriverEnabled {
if len(cfg.StackDriverTemplateURI) > 0 {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin([]string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)})
} else {
logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(
[]string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)})
}
logs = append(logs, &log)
}
return logs, nil

if len(cfg.Templates) > 0 {
for _, cfg := range cfg.Templates {
logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.TemplateURIs, cfg.MessageFormat)})
}
}

if len(logPlugins) == 0 {
return nil, nil
}

return taskLogPluginWrapper{
logPlugins: logPlugins,
}, nil
}
Loading

0 comments on commit f0b3882

Please sign in to comment.