diff --git a/api/k8s/application/k8sApplicationRestHandler.go b/api/k8s/application/k8sApplicationRestHandler.go index 087676758eb..de39e3641a7 100644 --- a/api/k8s/application/k8sApplicationRestHandler.go +++ b/api/k8s/application/k8sApplicationRestHandler.go @@ -1,15 +1,13 @@ package application import ( + "bufio" + "bytes" "context" "encoding/json" "errors" "fmt" "github.com/devtron-labs/common-lib/utils" - "net/http" - "strconv" - "strings" - util3 "github.com/devtron-labs/common-lib/utils/k8s" k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean" "github.com/devtron-labs/common-lib/utils/k8sObjectsUtil" @@ -27,12 +25,19 @@ import ( "github.com/devtron-labs/devtron/pkg/terminal" "github.com/devtron-labs/devtron/util" "github.com/devtron-labs/devtron/util/rbac" + "github.com/google/uuid" "github.com/gorilla/mux" errors2 "github.com/juju/errors" "go.uber.org/zap" "gopkg.in/go-playground/validator.v9" + "io" errors3 "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "net/http" + "regexp" + "strconv" + "strings" + "time" ) type K8sApplicationRestHandler interface { @@ -42,6 +47,7 @@ type K8sApplicationRestHandler interface { DeleteResource(w http.ResponseWriter, r *http.Request) ListEvents(w http.ResponseWriter, r *http.Request) GetPodLogs(w http.ResponseWriter, r *http.Request) + DownloadPodLogs(w http.ResponseWriter, r *http.Request) GetTerminalSession(w http.ResponseWriter, r *http.Request) GetResourceInfo(w http.ResponseWriter, r *http.Request) GetHostUrlsByBatch(w http.ResponseWriter, r *http.Request) @@ -620,11 +626,127 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter, common.WriteJsonResp(w, err, nil, http.StatusBadRequest) return } + handler.requestValidationAndRBAC(w, r, token, request) + lastEventId := r.Header.Get(bean2.LastEventID) + isReconnect := false + if len(lastEventId) > 0 { + lastSeenMsgId, err := strconv.ParseInt(lastEventId, bean2.IntegerBase, bean2.IntegerBitSize) + if err != nil { + common.WriteJsonResp(w, err, nil, http.StatusBadRequest) + return + } + lastSeenMsgId = lastSeenMsgId + bean2.TimestampOffsetToAvoidDuplicateLogs //increased by one ns to avoid duplicate + t := v1.Unix(0, lastSeenMsgId) + request.K8sRequest.PodLogsRequest.SinceTime = &t + isReconnect = true + } + stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request) + //err is handled inside StartK8sStreamWithHeartBeat method + ctx, cancel := context.WithCancel(r.Context()) + if cn, ok := w.(http.CloseNotifier); ok { + go func(done <-chan struct{}, closed <-chan bool) { + select { + case <-done: + case <-closed: + cancel() + } + }(ctx.Done(), cn.CloseNotify()) + } + defer cancel() + defer util.Close(stream, handler.logger) + handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err) +} + +func (handler *K8sApplicationRestHandlerImpl) DownloadPodLogs(w http.ResponseWriter, r *http.Request) { + token := r.Header.Get("token") + request, err := handler.k8sApplicationService.ValidatePodLogsRequestQuery(r) + if err != nil { + common.WriteJsonResp(w, err, nil, http.StatusBadRequest) + return + } + handler.requestValidationAndRBAC(w, r, token, request) + + // just to make sure follow flag is set to false when downloading logs + request.K8sRequest.PodLogsRequest.Follow = false + + stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request) + if err != nil { + common.WriteJsonResp(w, err, nil, http.StatusInternalServerError) + return + } + ctx, cancel := context.WithCancel(r.Context()) + if cn, ok := w.(http.CloseNotifier); ok { + go func(done <-chan struct{}, closed <-chan bool) { + select { + case <-done: + case <-closed: + cancel() + } + }(ctx.Done(), cn.CloseNotify()) + } + defer cancel() + defer util.Close(stream, handler.logger) + + var dataBuffer bytes.Buffer + bufReader := bufio.NewReader(stream) + eof := false + for !eof { + log, err := bufReader.ReadString('\n') + log = strings.TrimSpace(log) // Remove trailing line ending + a := regexp.MustCompile(" ") + var res []byte + splitLog := a.Split(log, 2) + if len(splitLog[0]) > 0 { + parsedTime, err := time.Parse(time.RFC3339, splitLog[0]) + if err != nil { + common.WriteJsonResp(w, err, nil, http.StatusInternalServerError) + return + } + gmtTimeLoc := time.FixedZone(bean2.LocalTimezoneInGMT, bean2.LocalTimeOffset) + humanReadableTime := parsedTime.In(gmtTimeLoc).Format(time.RFC1123) + res = append(res, humanReadableTime...) + } + + if len(splitLog) == 2 { + res = append(res, " "...) + res = append(res, splitLog[1]...) + } + res = append(res, "\n"...) + if err == io.EOF { + eof = true + // stop if we reached end of stream and the next line is empty + if log == "" { + break + } + } else if err != nil && err != io.EOF { + common.WriteJsonResp(w, err, nil, http.StatusInternalServerError) + return + } + _, err = dataBuffer.Write(res) + if err != nil { + common.WriteJsonResp(w, err, nil, http.StatusInternalServerError) + return + } + } + if len(dataBuffer.Bytes()) == 0 { + common.WriteJsonResp(w, nil, nil, http.StatusNoContent) + return + } + podLogsFilename := generatePodLogsFilename(request.K8sRequest.ResourceIdentifier.Name) + common.WriteOctetStreamResp(w, r, dataBuffer.Bytes(), podLogsFilename) + return +} + +func generatePodLogsFilename(filename string) string { + return fmt.Sprintf("podlogs-%s-%s.log", filename, uuid.New().String()) +} + +func (handler *K8sApplicationRestHandlerImpl) requestValidationAndRBAC(w http.ResponseWriter, r *http.Request, token string, request *k8s.ResourceRequestBean) { if request.AppIdentifier != nil { if request.DeploymentType == bean2.HelmInstalledType { valid, err := handler.k8sApplicationService.ValidateResourceRequest(r.Context(), request.AppIdentifier, request.K8sRequest) if err != nil || !valid { - handler.logger.Errorw("error in validating resource request", "err", err) + handler.logger.Errorw("error in validating resource request", "err", err, "request.AppIdentifier", request.AppIdentifier, "request.K8sRequest", request.K8sRequest) apiError := util2.ApiError{ InternalMessage: "failed to validate the resource with error " + err.Error(), UserMessage: "Failed to validate resource", @@ -671,34 +793,6 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter, common.WriteJsonResp(w, errors.New("can not get pod logs as target cluster is not provided"), nil, http.StatusBadRequest) return } - lastEventId := r.Header.Get("Last-Event-ID") - isReconnect := false - if len(lastEventId) > 0 { - lastSeenMsgId, err := strconv.ParseInt(lastEventId, 10, 64) - if err != nil { - common.WriteJsonResp(w, err, nil, http.StatusBadRequest) - return - } - lastSeenMsgId = lastSeenMsgId + 1 //increased by one ns to avoid duplicate - t := v1.Unix(0, lastSeenMsgId) - request.K8sRequest.PodLogsRequest.SinceTime = &t - isReconnect = true - } - stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request) - //err is handled inside StartK8sStreamWithHeartBeat method - ctx, cancel := context.WithCancel(r.Context()) - if cn, ok := w.(http.CloseNotifier); ok { - go func(done <-chan struct{}, closed <-chan bool) { - select { - case <-done: - case <-closed: - cancel() - } - }(ctx.Done(), cn.CloseNotify()) - } - defer cancel() - defer util.Close(stream, handler.logger) - handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err) } func (handler *K8sApplicationRestHandlerImpl) GetTerminalSession(w http.ResponseWriter, r *http.Request) { diff --git a/api/k8s/application/k8sApplicationRouter.go b/api/k8s/application/k8sApplicationRouter.go index ae5e4dd1e29..857a131d6b5 100644 --- a/api/k8s/application/k8sApplicationRouter.go +++ b/api/k8s/application/k8sApplicationRouter.go @@ -42,13 +42,13 @@ func (impl *K8sApplicationRouterImpl) InitK8sApplicationRouter(k8sAppRouter *mux k8sAppRouter.Path("/pods/logs/{podName}"). Queries("containerName", "{containerName}"). - //Queries("containerName", "{containerName}", "appId", "{appId}"). - //Queries("clusterId", "{clusterId}", "namespace", "${namespace}"). - //Queries("sinceSeconds", "{sinceSeconds}"). Queries("follow", "{follow}"). - Queries("tailLines", "{tailLines}"). HandlerFunc(impl.k8sApplicationRestHandler.GetPodLogs).Methods("GET") + k8sAppRouter.Path("/pods/logs/download/{podName}"). + Queries("containerName", "{containerName}"). + HandlerFunc(impl.k8sApplicationRestHandler.DownloadPodLogs).Methods("GET") + k8sAppRouter.Path("/pod/exec/session/{identifier}/{namespace}/{pod}/{shell}/{container}"). HandlerFunc(impl.k8sApplicationRestHandler.GetTerminalSession).Methods("GET") k8sAppRouter.PathPrefix("/pod/exec/sockjs/ws").Handler(terminal.CreateAttachHandler("/pod/exec/sockjs/ws")) diff --git a/api/restHandler/common/ApiResponseWriter.go b/api/restHandler/common/ApiResponseWriter.go index 4725c933eda..ac64a863772 100644 --- a/api/restHandler/common/ApiResponseWriter.go +++ b/api/restHandler/common/ApiResponseWriter.go @@ -52,11 +52,11 @@ func WriteApiJsonResponseStructured(w http.ResponseWriter, apiResponse *ApiRespo } func WriteOctetStreamResp(w http.ResponseWriter, r *http.Request, byteArr []byte, defaultFilename string) { - w.WriteHeader(http.StatusOK) + w.Header().Set(CONTENT_TYPE, "application/octet-stream") if defaultFilename != "" { w.Header().Set(CONTENT_DISPOSITION, "attachment; filename="+defaultFilename) } - w.Header().Set(CONTENT_TYPE, "application/octet-stream") w.Header().Set(CONTENT_LENGTH, r.Header.Get(CONTENT_LENGTH)) + w.WriteHeader(http.StatusOK) w.Write(byteArr) } diff --git a/go.mod b/go.mod index 9d13a214e33..23e24864430 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.8.0 github.com/devtron-labs/authenticator v0.4.33 - github.com/devtron-labs/common-lib v0.0.11 + github.com/devtron-labs/common-lib v0.0.12 github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2 github.com/evanphx/json-patch v5.6.0+incompatible github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 diff --git a/go.sum b/go.sum index 35f5f88032e..8524ccbc183 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,8 @@ github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADG github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM= github.com/devtron-labs/authenticator v0.4.33 h1:FpAV3ZgFluaRFcMwPpwxr/mwSipJ16XRvgABq3BzP5Y= github.com/devtron-labs/authenticator v0.4.33/go.mod h1:ozNfT8WcruiSgnUbyp48WVfc41++W6xYXhKFp67lNTU= -github.com/devtron-labs/common-lib v0.0.11 h1:xyVjD09miYhOKt0Oc//kOBWJas/OXsP7dyRoA1Hg90U= -github.com/devtron-labs/common-lib v0.0.11/go.mod h1:95/DizzVXu1kHap/VwEvdxwgd+BvPVYc0bJzt8yqGDU= +github.com/devtron-labs/common-lib v0.0.12 h1:HirqTWtaXWPbfGeqQurjtn26b2Az7sMFZ1JAAz2koNM= +github.com/devtron-labs/common-lib v0.0.12/go.mod h1:95/DizzVXu1kHap/VwEvdxwgd+BvPVYc0bJzt8yqGDU= github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2 h1:/IEIsJTxDZ3hv8uOoCaqdWCXqcv7nCAgX9AP/v84dUY= github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2/go.mod h1:l85jxWHlcSo910hdUfRycL40yGzC6glE93V1sVxVPto= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= diff --git a/pkg/k8s/application/bean/bean.go b/pkg/k8s/application/bean/bean.go index ed18919aa13..9793d96c38a 100644 --- a/pkg/k8s/application/bean/bean.go +++ b/pkg/k8s/application/bean/bean.go @@ -21,6 +21,18 @@ const ( ArgoInstalledType = 1 // Identifier for ArgoCD deployment ) +const ( + LastEventID = "Last-Event-ID" + TimestampOffsetToAvoidDuplicateLogs = 1 + IntegerBase = 10 + IntegerBitSize = 64 +) + +const ( + LocalTimezoneInGMT = "GMT+0530" + LocalTimeOffset = 5*60*60 + 30*60 +) + type ResourceInfo struct { PodName string `json:"podName"` } diff --git a/pkg/k8s/application/k8sApplicationService.go b/pkg/k8s/application/k8sApplicationService.go index a8811a35f08..90bd9643954 100644 --- a/pkg/k8s/application/k8sApplicationService.go +++ b/pkg/k8s/application/k8sApplicationService.go @@ -115,10 +115,32 @@ func (impl *K8sApplicationServiceImpl) ValidatePodLogsRequestQuery(r *http.Reque v, vars := r.URL.Query(), mux.Vars(r) request := &k8s.ResourceRequestBean{} podName := vars["podName"] - /*sinceSeconds, err := strconv.Atoi(v.Get("sinceSeconds")) - if err != nil { - sinceSeconds = 0 - }*/ + sinceSecondsParam := v.Get("sinceSeconds") + var sinceSeconds int + var err error + if len(sinceSecondsParam) > 0 { + sinceSeconds, err = strconv.Atoi(sinceSecondsParam) + if err != nil || sinceSeconds <= 0 { + return nil, &util.ApiError{ + Code: "400", + HttpStatusCode: 400, + UserMessage: "invalid value provided for sinceSeconds", + InternalMessage: "invalid value provided for sinceSeconds"} + } + } + sinceTimeParam := v.Get("sinceTime") + sinceTime := metav1.Unix(0, 0) + if len(sinceTimeParam) > 0 { + sinceTimeVar, err := strconv.ParseInt(sinceTimeParam, 10, 64) + if err != nil || sinceTimeVar <= 0 { + return nil, &util.ApiError{ + Code: "400", + HttpStatusCode: 400, + UserMessage: "invalid value provided for sinceTime", + InternalMessage: "invalid value provided for sinceTime"} + } + sinceTime = metav1.Unix(sinceTimeVar, 0) + } containerName, clusterIdString := v.Get("containerName"), v.Get("clusterId") prevContainerLogs := v.Get("previous") isPrevLogs, err := strconv.ParseBool(prevContainerLogs) @@ -130,9 +152,17 @@ func (impl *K8sApplicationServiceImpl) ValidatePodLogsRequestQuery(r *http.Reque if err != nil { follow = false } - tailLines, err := strconv.Atoi(v.Get("tailLines")) - if err != nil { - tailLines = 0 + tailLinesParam := v.Get("tailLines") + var tailLines int + if len(tailLinesParam) > 0 { + tailLines, err = strconv.Atoi(tailLinesParam) + if err != nil || tailLines <= 0 { + return nil, &util.ApiError{ + Code: "400", + HttpStatusCode: 400, + UserMessage: "invalid value provided for tailLines", + InternalMessage: "invalid value provided for tailLines"} + } } k8sRequest := &k8s2.K8sRequestBean{ ResourceIdentifier: k8s2.ResourceIdentifier{ @@ -140,7 +170,8 @@ func (impl *K8sApplicationServiceImpl) ValidatePodLogsRequestQuery(r *http.Reque GroupVersionKind: schema.GroupVersionKind{}, }, PodLogsRequest: k8s2.PodLogsRequest{ - //SinceTime: sinceSeconds, + SinceSeconds: sinceSeconds, + SinceTime: &sinceTime, TailLines: tailLines, Follow: follow, ContainerName: containerName, @@ -315,7 +346,7 @@ func (impl *K8sApplicationServiceImpl) GetPodLogs(ctx context.Context, request * resourceIdentifier := request.K8sRequest.ResourceIdentifier podLogsRequest := request.K8sRequest.PodLogsRequest - resp, err := impl.K8sUtil.GetPodLogs(ctx, restConfig, resourceIdentifier.Name, resourceIdentifier.Namespace, podLogsRequest.SinceTime, podLogsRequest.TailLines, podLogsRequest.Follow, podLogsRequest.ContainerName, podLogsRequest.IsPrevContainerLogsEnabled) + resp, err := impl.K8sUtil.GetPodLogs(ctx, restConfig, resourceIdentifier.Name, resourceIdentifier.Namespace, podLogsRequest.SinceTime, podLogsRequest.TailLines, podLogsRequest.SinceSeconds, podLogsRequest.Follow, podLogsRequest.ContainerName, podLogsRequest.IsPrevContainerLogsEnabled) if err != nil { impl.logger.Errorw("error in getting pod logs", "err", err, "clusterId", clusterId) return nil, err diff --git a/pkg/k8s/bean.go b/pkg/k8s/bean.go index 61edcd2f570..1245b97d3ba 100644 --- a/pkg/k8s/bean.go +++ b/pkg/k8s/bean.go @@ -16,6 +16,11 @@ type ResourceRequestBean struct { ClusterId int `json:"clusterId"` // clusterId is used when request is for direct cluster (not for helm release) } +type LogsDownloadBean struct { + FileName string `json:"fileName"` + LogsData string `json:"data"` +} + type BatchResourceResponse struct { ManifestResponse *k8s.ManifestResponse Err error diff --git a/vendor/github.com/devtron-labs/common-lib/utils/k8s/K8sUtil.go b/vendor/github.com/devtron-labs/common-lib/utils/k8s/K8sUtil.go index 751ea3bcea1..cb183b6608a 100644 --- a/vendor/github.com/devtron-labs/common-lib/utils/k8s/K8sUtil.go +++ b/vendor/github.com/devtron-labs/common-lib/utils/k8s/K8sUtil.go @@ -1205,7 +1205,7 @@ func (impl K8sServiceImpl) ListEvents(restConfig *rest.Config, namespace string, } -func (impl K8sServiceImpl) GetPodLogs(ctx context.Context, restConfig *rest.Config, name string, namespace string, sinceTime *metav1.Time, tailLines int, follow bool, containerName string, isPrevContainerLogsEnabled bool) (io.ReadCloser, error) { +func (impl K8sServiceImpl) GetPodLogs(ctx context.Context, restConfig *rest.Config, name string, namespace string, sinceTime *metav1.Time, tailLines int, sinceSeconds int, follow bool, containerName string, isPrevContainerLogsEnabled bool) (io.ReadCloser, error) { httpClient, err := OverrideK8sHttpClientWithTracer(restConfig) if err != nil { impl.logger.Errorw("error in getting pod logs", "err", err) @@ -1217,14 +1217,21 @@ func (impl K8sServiceImpl) GetPodLogs(ctx context.Context, restConfig *rest.Conf return nil, err } TailLines := int64(tailLines) + SinceSeconds := int64(sinceSeconds) podLogOptions := &v1.PodLogOptions{ Follow: follow, - TailLines: &TailLines, Container: containerName, Timestamps: true, Previous: isPrevContainerLogsEnabled, } - if sinceTime != nil { + startTime := metav1.Unix(0, 0) + if TailLines > 0 { + podLogOptions.TailLines = &TailLines + } + if SinceSeconds > 0 { + podLogOptions.SinceSeconds = &SinceSeconds + } + if *sinceTime != startTime { podLogOptions.SinceTime = sinceTime } podIf := podClient.Pods(namespace) diff --git a/vendor/github.com/devtron-labs/common-lib/utils/k8s/bean.go b/vendor/github.com/devtron-labs/common-lib/utils/k8s/bean.go index f27ce9676fa..8333cc92c0f 100644 --- a/vendor/github.com/devtron-labs/common-lib/utils/k8s/bean.go +++ b/vendor/github.com/devtron-labs/common-lib/utils/k8s/bean.go @@ -60,6 +60,7 @@ type ResourceListResponse struct { type PodLogsRequest struct { SinceTime *v12.Time `json:"sinceTime,omitempty"` + SinceSeconds int `json:"sinceSeconds,omitempty"` TailLines int `json:"tailLines"` Follow bool `json:"follow"` ContainerName string `json:"containerName"` diff --git a/vendor/modules.txt b/vendor/modules.txt index cb468f06e96..305736d44ff 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -354,7 +354,7 @@ github.com/devtron-labs/authenticator/jwt github.com/devtron-labs/authenticator/middleware github.com/devtron-labs/authenticator/oidc github.com/devtron-labs/authenticator/password -# github.com/devtron-labs/common-lib v0.0.11 +# github.com/devtron-labs/common-lib v0.0.12 ## explicit; go 1.20 github.com/devtron-labs/common-lib/blob-storage github.com/devtron-labs/common-lib/cloud-provider-identifier