diff --git a/Gopkg.lock b/Gopkg.lock index 9b814baeb52..e2019eeef32 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,7 @@ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger", "github.com/grpc-ecosystem/grpc-gateway/runtime", "github.com/grpc-ecosystem/grpc-gateway/utilities", + "github.com/hpcloud/tail", "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1", "github.com/kubeflow/tf-operator/pkg/apis/common/v1", "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1", @@ -1272,6 +1273,7 @@ "k8s.io/client-go/kubernetes/scheme", "k8s.io/client-go/plugin/pkg/client/auth/gcp", "k8s.io/client-go/rest", + "k8s.io/client-go/tools/record", "k8s.io/code-generator/cmd/deepcopy-gen", "k8s.io/klog", "k8s.io/utils/pointer", diff --git a/cmd/metricscollector/v1alpha3/file-metricscollector/main.go b/cmd/metricscollector/v1alpha3/file-metricscollector/main.go index aa438a7bd19..e72e07ba7cf 100644 --- a/cmd/metricscollector/v1alpha3/file-metricscollector/main.go +++ b/cmd/metricscollector/v1alpha3/file-metricscollector/main.go @@ -40,8 +40,10 @@ package main import ( "context" "flag" + "os" "strings" + "github.com/hpcloud/tail" "google.golang.org/grpc" "k8s.io/klog" @@ -58,10 +60,29 @@ var pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval var timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout to check if main process of worker container exit") var waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting") +func printMetricsFile(mFile string) { + for { + _, err := os.Stat(mFile) + if err == nil { + break + } else if os.IsNotExist(err) { + continue + } else { + klog.Fatalf("could not watch metrics file: %v", err) + } + } + + t, _ := tail.TailFile(mFile, tail.Config{Follow: true}) + for line := range t.Lines { + klog.Info(line.Text) + } +} + func main() { flag.Parse() klog.Infof("Trial Name: %s", *trialName) + go printMetricsFile(*metricsFileName) wopts := common.WaitPidsOpts{ PollInterval: *pollInterval, Timeout: *timeout, diff --git a/pkg/metricscollector/v1alpha3/common/const.go b/pkg/metricscollector/v1alpha3/common/const.go index 23b63ae7dd0..b0854f594e5 100644 --- a/pkg/metricscollector/v1alpha3/common/const.go +++ b/pkg/metricscollector/v1alpha3/common/const.go @@ -21,5 +21,6 @@ const ( DefaultTimeout = 0 DefaultWaitAll = false - MetricCollectorContainerName = "metrics-collector" + MetricCollectorContainerName = "metrics-collector" + MetricLoggerCollectorContainerName = "metrics-logger-and-collector" ) diff --git a/pkg/webhook/v1alpha3/pod/inject_webhook.go b/pkg/webhook/v1alpha3/pod/inject_webhook.go index e02c9b7c21e..74f74f0e08a 100644 --- a/pkg/webhook/v1alpha3/pod/inject_webhook.go +++ b/pkg/webhook/v1alpha3/pod/inject_webhook.go @@ -149,8 +149,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) return nil, err } args := getMetricsCollectorArgs(trialName, metricName, trial.Spec.MetricsCollector) + sidecarContainerName := getSidecarContainerName(trial.Spec.MetricsCollector.Collector.Kind) injectContainer := v1.Container{ - Name: mccommon.MetricCollectorContainerName, + Name: sidecarContainerName, Image: image, Args: args, ImagePullPolicy: v1.PullIfNotPresent, @@ -161,7 +162,7 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) if mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector); mountPath != "" { wrapWorkerContainer(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector) - if err = mutateVolume(mutatedPod, kind, mountPath, pathKind); err != nil { + if err = mutateVolume(mutatedPod, kind, mountPath, sidecarContainerName, pathKind); err != nil { return nil, err } } @@ -244,7 +245,7 @@ func wrapWorkerContainer(pod *v1.Pod, jobKind, metricsFile string, mc common.Met if c.Args != nil { args = append(args, c.Args...) } - redirectStr := fmt.Sprintf(" 2>&1 | tee %s", metricsFile) + redirectStr := fmt.Sprintf("1>%s 2>&1", metricsFile) args = append(args, redirectStr) argsStr := strings.Join(args, " ") c.Command = command @@ -275,7 +276,7 @@ func isWorkerContainer(jobKind string, index int, c v1.Container) bool { return false } -func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSystemKind) error { +func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, pathKind common.FileSystemKind) error { metricsVol := v1.Volume{ Name: common.MetricsVolume, VolumeSource: v1.VolumeSource{ @@ -293,7 +294,7 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSy index_list := []int{} for i, c := range pod.Spec.Containers { shouldMount := false - if c.Name == mccommon.MetricCollectorContainerName { + if c.Name == sidecarContainerName { shouldMount = true } else { shouldMount = isWorkerContainer(jobKind, i, c) @@ -314,3 +315,11 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSy return nil } + +func getSidecarContainerName(cKind common.CollectorKind) string { + if cKind == common.StdOutCollector || cKind == common.FileCollector { + return mccommon.MetricLoggerCollectorContainerName + } else { + return mccommon.MetricCollectorContainerName + } +}