Skip to content

Commit

Permalink
Fix worker error silent (kubeflow#863)
Browse files Browse the repository at this point in the history
* Fix worker error silent

* Use 3rd golang tail to replace linux os command

* Rename sidecar container name
  • Loading branch information
hougangliu authored and k8s-ci-robot committed Oct 9, 2019
1 parent 5cdbde6 commit d39ee12
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions cmd/metricscollector/v1alpha3/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ package main
import (
"context"
"flag"
"os"
"strings"

"github.com/hpcloud/tail"
"google.golang.org/grpc"
"k8s.io/klog"

Expand All @@ -58,10 +60,29 @@ var pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval
var timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout to check if main process of worker container exit")
var waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting")

func printMetricsFile(mFile string) {
for {
_, err := os.Stat(mFile)
if err == nil {
break
} else if os.IsNotExist(err) {
continue
} else {
klog.Fatalf("could not watch metrics file: %v", err)
}
}

t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
for line := range t.Lines {
klog.Info(line.Text)
}
}

func main() {
flag.Parse()
klog.Infof("Trial Name: %s", *trialName)

go printMetricsFile(*metricsFileName)
wopts := common.WaitPidsOpts{
PollInterval: *pollInterval,
Timeout: *timeout,
Expand Down
3 changes: 2 additions & 1 deletion pkg/metricscollector/v1alpha3/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ const (
DefaultTimeout = 0
DefaultWaitAll = false

MetricCollectorContainerName = "metrics-collector"
MetricCollectorContainerName = "metrics-collector"
MetricLoggerCollectorContainerName = "metrics-logger-and-collector"
)
19 changes: 14 additions & 5 deletions pkg/webhook/v1alpha3/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error)
return nil, err
}
args := getMetricsCollectorArgs(trialName, metricName, trial.Spec.MetricsCollector)
sidecarContainerName := getSidecarContainerName(trial.Spec.MetricsCollector.Collector.Kind)
injectContainer := v1.Container{
Name: mccommon.MetricCollectorContainerName,
Name: sidecarContainerName,
Image: image,
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Expand All @@ -161,7 +162,7 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error)

if mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector); mountPath != "" {
wrapWorkerContainer(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector)
if err = mutateVolume(mutatedPod, kind, mountPath, pathKind); err != nil {
if err = mutateVolume(mutatedPod, kind, mountPath, sidecarContainerName, pathKind); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func wrapWorkerContainer(pod *v1.Pod, jobKind, metricsFile string, mc common.Met
if c.Args != nil {
args = append(args, c.Args...)
}
redirectStr := fmt.Sprintf(" 2>&1 | tee %s", metricsFile)
redirectStr := fmt.Sprintf("1>%s 2>&1", metricsFile)
args = append(args, redirectStr)
argsStr := strings.Join(args, " ")
c.Command = command
Expand Down Expand Up @@ -275,7 +276,7 @@ func isWorkerContainer(jobKind string, index int, c v1.Container) bool {
return false
}

func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSystemKind) error {
func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, pathKind common.FileSystemKind) error {
metricsVol := v1.Volume{
Name: common.MetricsVolume,
VolumeSource: v1.VolumeSource{
Expand All @@ -293,7 +294,7 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSy
index_list := []int{}
for i, c := range pod.Spec.Containers {
shouldMount := false
if c.Name == mccommon.MetricCollectorContainerName {
if c.Name == sidecarContainerName {
shouldMount = true
} else {
shouldMount = isWorkerContainer(jobKind, i, c)
Expand All @@ -314,3 +315,11 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSy

return nil
}

func getSidecarContainerName(cKind common.CollectorKind) string {
if cKind == common.StdOutCollector || cKind == common.FileCollector {
return mccommon.MetricLoggerCollectorContainerName
} else {
return mccommon.MetricCollectorContainerName
}
}

0 comments on commit d39ee12

Please sign in to comment.