Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge kubectl logs #227

Merged
merged 6 commits into from Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cli/cmd/get.go
Expand Up @@ -542,6 +542,10 @@ func dataResourceTable(resources []context.Resource, dataStatuses map[string]*re
func apiResourceTable(apiGroupStatuses map[string]*resource.APIGroupStatus) string {
rows := make([][]interface{}, 0, len(apiGroupStatuses))
for name, groupStatus := range apiGroupStatuses {
if groupStatus.Requested == 0 {
continue
}

var updatedAt *time.Time
if groupStatus.ActiveStatus != nil {
updatedAt = groupStatus.ActiveStatus.Start
Expand Down
250 changes: 202 additions & 48 deletions pkg/operator/workloads/logs.go
Expand Up @@ -18,6 +18,8 @@ package workloads

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -26,19 +28,22 @@ import (
"time"

"github.com/gorilla/websocket"

kcore "k8s.io/api/core/v1"

"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
"github.com/cortexlabs/cortex/pkg/operator/config"
)

const (
writeWait = 10 * time.Second
closeGracePeriod = 10 * time.Second
maxMessageSize = 8192
writeWait = 10 * time.Second
closeGracePeriod = 10 * time.Second
maxMessageSize = 8192
podCheckInterval = 5 * time.Second
maxParallelPodLogging = 5
initLogTailLines = 20
)

func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket.Conn) {
Expand All @@ -56,40 +61,46 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
}

if len(pods) > 0 {
if len(pods) > 1 {
if !writeSocket(fmt.Sprintf("%d pods available, streaming logs for one of them:", len(pods)), socket) {
if len(pods) > maxParallelPodLogging {
if !writeSocket(fmt.Sprintf("\n%d pods available, streaming logs for %d of them:", len(pods), maxParallelPodLogging), socket) {
return
}
}

podMap := make(map[k8s.PodStatus][]kcore.Pod)
for _, pod := range pods {
podMap[k8s.GetPodStatus(&pod)] = append(podMap[k8s.GetPodStatus(&pod)], pod)
podStatus := k8s.GetPodStatus(&pod)
if len(podMap[podStatus]) < maxParallelPodLogging {
podMap[podStatus] = append(podMap[podStatus], pod)
}
}

switch {
case len(podMap[k8s.PodStatusSucceeded]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusSucceeded][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusRunning]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusRunning][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusPending]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusPending][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusKilled]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusKilled][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusKilledOOM]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusKilledOOM][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusFailed]) > 0:
previous := false
if pods[0].Labels["workloadType"] == WorkloadTypeAPI {
previous = true
}
getKubectlLogs(&podMap[k8s.PodStatusFailed][0], verbose, wrotePending, previous, socket)
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, previous, socket)
case len(podMap[k8s.PodStatusTerminating]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusTerminating][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, false, socket)
case len(podMap[k8s.PodStatusUnknown]) > 0:
getKubectlLogs(&podMap[k8s.PodStatusUnknown][0], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, false, socket)
default: // unexpected
getKubectlLogs(&pods[0], verbose, wrotePending, false, socket)
if len(pods) > maxParallelPodLogging {
pods = pods[:maxParallelPodLogging]
}
getKubectlLogs(pods, verbose, wrotePending, false, socket)
}
return
}
Expand Down Expand Up @@ -118,7 +129,7 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
if !writeSocket("\nFailed to start:\n", socket) {
return
}
getKubectlLogs(failedArgoPod, true, false, false, socket)
getKubectlLogs([]kcore.Pod{*failedArgoPod}, true, false, false, socket)
return
}

Expand All @@ -133,52 +144,169 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
}
}

func getKubectlLogs(pod *kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
cmdPath := "/usr/local/bin/kubectl"
func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
isAllPending := true
for _, pod := range pods {
if k8s.GetPodStatus(&pod) != k8s.PodStatusPending {
isAllPending = false
break
}
}

if k8s.GetPodStatus(pod) == k8s.PodStatusPending {
if !wrotePending {
if !writeSocket("\nPending", socket) {
return
}
if isAllPending {
if !writeSocket("\nPending", socket) {
return
}
config.Kubernetes.WaitForPodRunning(pod.Name, 1)
}

args := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
inr, inw, err := os.Pipe()
if err != nil {
errors.Panic(err, "logs", "kubectl", "os.pipe")
}
defer inw.Close()
defer inr.Close()

podCheckCancel := make(chan struct{})
defer close(podCheckCancel)

go podCheck(podCheckCancel, socket, pods, previous, verbose, inr)
pumpStdin(socket, inw)
podCheckCancel <- struct{}{}
}

func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) (*os.Process, error) {
cmdPath := "/bin/bash"

kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
if previous {
args = append(args, "--previous")
kubectlArgs = append(kubectlArgs, "--previous")
}

args = append(args, pod.Name)
identifier := pod.Name
kubectlArgs = append(kubectlArgs, pod.Name)
if pod.Labels["workloadType"] == WorkloadTypeAPI && pod.Labels["userFacing"] == "true" {
args = append(args, apiContainerName)
kubectlArgs = append(kubectlArgs, apiContainerName)
kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
identifier += " " + apiContainerName
}

outr, outw, err := os.Pipe()
labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY \" | tail -n +1; done", identifier)
deliahu marked this conversation as resolved.
Show resolved Hide resolved
kubectlCmd := strings.Join(kubectlArgs, " ")
bashArgs := []string{"/bin/bash", "-c", kubectlCmd + labelLog}
process, err := os.StartProcess(cmdPath, bashArgs, attrs)
if err != nil {
errors.Panic(err, "logs", "kubectl", "os.pipe")
return nil, errors.Wrap(err, strings.Join(bashArgs, " "))
}
defer outr.Close()
defer outw.Close()
deliahu marked this conversation as resolved.
Show resolved Hide resolved

inr, inw, err := os.Pipe()
return process, nil
}

func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, inr *os.File) {
timer := time.NewTimer(0)
defer timer.Stop()

processMap := make(map[string]*os.Process)
defer deleteProcesses(processMap)
labels := initialPodList[0].GetLabels()
appName := labels["appName"]
workloadID := labels["workloadID"]

outr, outw, err := os.Pipe()
if err != nil {
errors.Panic(err, "logs", "kubectl", "os.pipe")
}
defer inr.Close()
defer inw.Close()
defer outw.Close()
defer outr.Close()

process, err := os.StartProcess(cmdPath, args, &os.ProcAttr{
Files: []*os.File{inr, outw, outw},
})
if err != nil {
errors.Panic(err, strings.Join(args, " "))
socketWriterError := make(chan error, 1)
defer close(socketWriterError)

go pumpStdout(socket, socketWriterError, outr, verbose, true)

for {
select {
case <-podCheckCancel:
return
case <-timer.C:
pods, err := config.Kubernetes.ListPodsByLabels(map[string]string{
"appName": appName,
"workloadID": workloadID,
"userFacing": "true",
})

if err != nil {
socketWriterError <- errors.Wrap(err, "pod check")
return
}

latestRunningPodsMap := make(map[string]kcore.Pod)
latestRunningPods := strset.New()
for _, pod := range pods {
if k8s.GetPodStatus(&pod) != k8s.PodStatusPending {
latestRunningPods.Add(pod.GetName())
latestRunningPodsMap[pod.GetName()] = pod
}
}

prevRunningPods := strset.New()
for podName := range processMap {
prevRunningPods.Add(podName)
}

newPods := strset.Difference(latestRunningPods, prevRunningPods)
podsToDelete := strset.Difference(prevRunningPods, latestRunningPods)
podsToKeep := strset.Intersection(prevRunningPods, latestRunningPods)

// Prioritize adding running pods
podsToAddRunning := []string{}
podsToAddNotRunning := []string{}

for podName := range newPods {
pod := latestRunningPodsMap[podName]
if k8s.GetPodStatus(&pod) == k8s.PodStatusRunning {
podsToAddRunning = append(podsToAddRunning, podName)
} else {
podsToAddNotRunning = append(podsToAddNotRunning, podName)
}
}
podsToAdd := append(podsToAddRunning, podsToAddNotRunning...)

maxPodsToAdd := maxParallelPodLogging - len(podsToKeep)
if len(podsToAdd) < maxPodsToAdd {
maxPodsToAdd = len(podsToAdd)
}

for _, podName := range podsToAdd[:maxPodsToAdd] {
process, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
Files: []*os.File{inr, outw, outw},
})
if err != nil {
socketWriterError <- err
return
}
processMap[podName] = process
}

deleteMap := make(map[string]*os.Process, len(podsToDelete))

for podName := range podsToDelete {
deleteMap[podName] = processMap[podName]
deliahu marked this conversation as resolved.
Show resolved Hide resolved
delete(processMap, podName)
}
deleteProcesses(deleteMap)
timer.Reset(podCheckInterval)
}
}
}

go pumpStdout(socket, outr, verbose, true)
pumpStdin(socket, inw)
stopProcess(process)
func deleteProcesses(processMap map[string]*os.Process) {
for _, process := range processMap {
process.Signal(os.Interrupt)
}
time.Sleep(5 * time.Second)
for _, process := range processMap {
process.Signal(os.Kill)
}
}

func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) {
Expand All @@ -196,7 +324,10 @@ func getCloudWatchLogs(prefix string, verbose bool, socket *websocket.Conn) {
} else {
logsReader = strings.NewReader(logs)
}
go pumpStdout(socket, logsReader, verbose, false)

socketWriterError := make(chan error)
defer close(socketWriterError)
go pumpStdout(socket, socketWriterError, logsReader, verbose, false)

inr, inw, err := os.Pipe()
if err != nil {
Expand Down Expand Up @@ -224,7 +355,7 @@ func pumpStdin(socket *websocket.Conn, writer io.Writer) {
}
}

func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkForLastLog bool) {
func pumpStdout(socket *websocket.Conn, socketWriterError chan error, reader io.Reader, verbose bool, checkForLastLog bool) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
socket.SetWriteDeadline(time.Now().Add(writeWait))
Expand All @@ -243,14 +374,23 @@ func pumpStdout(socket *websocket.Conn, reader io.Reader, verbose bool, checkFor
}
}

select {
case err := <-socketWriterError:
if err != nil {
writeSocket(err.Error(), socket)
}
default:
}

socket.SetWriteDeadline(time.Now().Add(writeWait))
socket.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
time.Sleep(closeGracePeriod)
socket.Close()
}

var cortexRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`)
var tensorflowRegex = regexp.MustCompile(`^?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`)
var cortexRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):cortex:`)
var tensorflowRegex = regexp.MustCompile(`^\[.*\]?(DEBUG|INFO|WARNING|ERROR|CRITICAL):tensorflow:`)
var jsonPrefixRegex = regexp.MustCompile(`^\ *?(\{|\[)`)

func formatHeader1(headerString string) *string {
headerBorder := "\n" + strings.Repeat("-", len(headerString)) + "\n"
Expand Down Expand Up @@ -299,6 +439,20 @@ func extractFromCortexLog(match string, loglevel string, logStr string) (*string
return formatHeader3(cutStr), false
}

matches := jsonPrefixRegex.FindStringSubmatch(cutStr)
if len(matches) == 2 {
indentIndex := len(matches[0]) - 1 // matches to curly or square bracket so remove the last char
indentStr := cutStr[:indentIndex]
maybeJSON := cutStr[indentIndex:]
jsonBytes := []byte(maybeJSON)
var obytes bytes.Buffer
err := json.Indent(&obytes, jsonBytes, indentStr, " ")
deliahu marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
ostr := indentStr + string(obytes.String())
return &ostr, false
}
}

lastLogRe := regexp.MustCompile(`^workload: (\w+), completed: (\S+)`)
if lastLogRe.MatchString(cutStr) {
return &cutStr, true
Expand Down
2 changes: 1 addition & 1 deletion pkg/workloads/cortex/lib/util.py
Expand Up @@ -118,7 +118,7 @@ def log_pretty(obj, indent=0, logging_func=logger.info):


def log_pretty_flat(obj, indent=0, logging_func=logger.info):
logging_func(pp_str_flat(obj), indent)
logging_func(pp_str_flat(obj, indent))


def pluralize(num, singular, plural):
Expand Down