Skip to content

Commit

Permalink
feat: added support for downloading pod logs (#4539)
Browse files Browse the repository at this point in the history
* wip

* added support for download pod logs

* set follow flag to false on backend as well

* updated common lib vrsion

* changed response type from octet stream to json

* changed the ordering of headers and writes in octet stream

* changed file extension

* refactoring

* fix

* time format converted

* validation fix

* fix

* added return statement

* time format and http status code fixed

* fix

* fix

* time format fixed

* fix

* const updated

* status code fix

* fix

* fix2

* fix3

* common lib version upgrade
  • Loading branch information
ashishdevtron committed Feb 5, 2024
1 parent d123afc commit 95f7e2d
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 55 deletions.
160 changes: 127 additions & 33 deletions api/k8s/application/k8sApplicationRestHandler.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package application

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/devtron-labs/common-lib/utils"
"net/http"
"strconv"
"strings"

util3 "github.com/devtron-labs/common-lib/utils/k8s"
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
"github.com/devtron-labs/common-lib/utils/k8sObjectsUtil"
Expand All @@ -27,12 +25,19 @@ import (
"github.com/devtron-labs/devtron/pkg/terminal"
"github.com/devtron-labs/devtron/util"
"github.com/devtron-labs/devtron/util/rbac"
"github.com/google/uuid"
"github.com/gorilla/mux"
errors2 "github.com/juju/errors"
"go.uber.org/zap"
"gopkg.in/go-playground/validator.v9"
"io"
errors3 "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"regexp"
"strconv"
"strings"
"time"
)

type K8sApplicationRestHandler interface {
Expand All @@ -42,6 +47,7 @@ type K8sApplicationRestHandler interface {
DeleteResource(w http.ResponseWriter, r *http.Request)
ListEvents(w http.ResponseWriter, r *http.Request)
GetPodLogs(w http.ResponseWriter, r *http.Request)
DownloadPodLogs(w http.ResponseWriter, r *http.Request)
GetTerminalSession(w http.ResponseWriter, r *http.Request)
GetResourceInfo(w http.ResponseWriter, r *http.Request)
GetHostUrlsByBatch(w http.ResponseWriter, r *http.Request)
Expand Down Expand Up @@ -620,11 +626,127 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
handler.requestValidationAndRBAC(w, r, token, request)
lastEventId := r.Header.Get(bean2.LastEventID)
isReconnect := false
if len(lastEventId) > 0 {
lastSeenMsgId, err := strconv.ParseInt(lastEventId, bean2.IntegerBase, bean2.IntegerBitSize)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
lastSeenMsgId = lastSeenMsgId + bean2.TimestampOffsetToAvoidDuplicateLogs //increased by one ns to avoid duplicate
t := v1.Unix(0, lastSeenMsgId)
request.K8sRequest.PodLogsRequest.SinceTime = &t
isReconnect = true
}
stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request)
//err is handled inside StartK8sStreamWithHeartBeat method
ctx, cancel := context.WithCancel(r.Context())
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
defer cancel()
defer util.Close(stream, handler.logger)
handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err)
}

func (handler *K8sApplicationRestHandlerImpl) DownloadPodLogs(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
request, err := handler.k8sApplicationService.ValidatePodLogsRequestQuery(r)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
handler.requestValidationAndRBAC(w, r, token, request)

// just to make sure follow flag is set to false when downloading logs
request.K8sRequest.PodLogsRequest.Follow = false

stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
ctx, cancel := context.WithCancel(r.Context())
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
defer cancel()
defer util.Close(stream, handler.logger)

var dataBuffer bytes.Buffer
bufReader := bufio.NewReader(stream)
eof := false
for !eof {
log, err := bufReader.ReadString('\n')
log = strings.TrimSpace(log) // Remove trailing line ending
a := regexp.MustCompile(" ")
var res []byte
splitLog := a.Split(log, 2)
if len(splitLog[0]) > 0 {
parsedTime, err := time.Parse(time.RFC3339, splitLog[0])
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
gmtTimeLoc := time.FixedZone(bean2.LocalTimezoneInGMT, bean2.LocalTimeOffset)
humanReadableTime := parsedTime.In(gmtTimeLoc).Format(time.RFC1123)
res = append(res, humanReadableTime...)
}

if len(splitLog) == 2 {
res = append(res, " "...)
res = append(res, splitLog[1]...)
}
res = append(res, "\n"...)
if err == io.EOF {
eof = true
// stop if we reached end of stream and the next line is empty
if log == "" {
break
}
} else if err != nil && err != io.EOF {
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
_, err = dataBuffer.Write(res)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
}
if len(dataBuffer.Bytes()) == 0 {
common.WriteJsonResp(w, nil, nil, http.StatusNoContent)
return
}
podLogsFilename := generatePodLogsFilename(request.K8sRequest.ResourceIdentifier.Name)
common.WriteOctetStreamResp(w, r, dataBuffer.Bytes(), podLogsFilename)
return
}

func generatePodLogsFilename(filename string) string {
return fmt.Sprintf("podlogs-%s-%s.log", filename, uuid.New().String())
}

func (handler *K8sApplicationRestHandlerImpl) requestValidationAndRBAC(w http.ResponseWriter, r *http.Request, token string, request *k8s.ResourceRequestBean) {
if request.AppIdentifier != nil {
if request.DeploymentType == bean2.HelmInstalledType {
valid, err := handler.k8sApplicationService.ValidateResourceRequest(r.Context(), request.AppIdentifier, request.K8sRequest)
if err != nil || !valid {
handler.logger.Errorw("error in validating resource request", "err", err)
handler.logger.Errorw("error in validating resource request", "err", err, "request.AppIdentifier", request.AppIdentifier, "request.K8sRequest", request.K8sRequest)
apiError := util2.ApiError{
InternalMessage: "failed to validate the resource with error " + err.Error(),
UserMessage: "Failed to validate resource",
Expand Down Expand Up @@ -671,34 +793,6 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
common.WriteJsonResp(w, errors.New("can not get pod logs as target cluster is not provided"), nil, http.StatusBadRequest)
return
}
lastEventId := r.Header.Get("Last-Event-ID")
isReconnect := false
if len(lastEventId) > 0 {
lastSeenMsgId, err := strconv.ParseInt(lastEventId, 10, 64)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
lastSeenMsgId = lastSeenMsgId + 1 //increased by one ns to avoid duplicate
t := v1.Unix(0, lastSeenMsgId)
request.K8sRequest.PodLogsRequest.SinceTime = &t
isReconnect = true
}
stream, err := handler.k8sApplicationService.GetPodLogs(r.Context(), request)
//err is handled inside StartK8sStreamWithHeartBeat method
ctx, cancel := context.WithCancel(r.Context())
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
defer cancel()
defer util.Close(stream, handler.logger)
handler.pump.StartK8sStreamWithHeartBeat(w, isReconnect, stream, err)
}

func (handler *K8sApplicationRestHandlerImpl) GetTerminalSession(w http.ResponseWriter, r *http.Request) {
Expand Down
8 changes: 4 additions & 4 deletions api/k8s/application/k8sApplicationRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func (impl *K8sApplicationRouterImpl) InitK8sApplicationRouter(k8sAppRouter *mux

k8sAppRouter.Path("/pods/logs/{podName}").
Queries("containerName", "{containerName}").
//Queries("containerName", "{containerName}", "appId", "{appId}").
//Queries("clusterId", "{clusterId}", "namespace", "${namespace}").
//Queries("sinceSeconds", "{sinceSeconds}").
Queries("follow", "{follow}").
Queries("tailLines", "{tailLines}").
HandlerFunc(impl.k8sApplicationRestHandler.GetPodLogs).Methods("GET")

k8sAppRouter.Path("/pods/logs/download/{podName}").
Queries("containerName", "{containerName}").
HandlerFunc(impl.k8sApplicationRestHandler.DownloadPodLogs).Methods("GET")

k8sAppRouter.Path("/pod/exec/session/{identifier}/{namespace}/{pod}/{shell}/{container}").
HandlerFunc(impl.k8sApplicationRestHandler.GetTerminalSession).Methods("GET")
k8sAppRouter.PathPrefix("/pod/exec/sockjs/ws").Handler(terminal.CreateAttachHandler("/pod/exec/sockjs/ws"))
Expand Down
4 changes: 2 additions & 2 deletions api/restHandler/common/ApiResponseWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func WriteApiJsonResponseStructured(w http.ResponseWriter, apiResponse *ApiRespo
}

func WriteOctetStreamResp(w http.ResponseWriter, r *http.Request, byteArr []byte, defaultFilename string) {
w.WriteHeader(http.StatusOK)
w.Header().Set(CONTENT_TYPE, "application/octet-stream")
if defaultFilename != "" {
w.Header().Set(CONTENT_DISPOSITION, "attachment; filename="+defaultFilename)
}
w.Header().Set(CONTENT_TYPE, "application/octet-stream")
w.Header().Set(CONTENT_LENGTH, r.Header.Get(CONTENT_LENGTH))
w.WriteHeader(http.StatusOK)
w.Write(byteArr)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/devtron-labs/authenticator v0.4.33
github.com/devtron-labs/common-lib v0.0.11
github.com/devtron-labs/common-lib v0.0.12
github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADG
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/devtron-labs/authenticator v0.4.33 h1:FpAV3ZgFluaRFcMwPpwxr/mwSipJ16XRvgABq3BzP5Y=
github.com/devtron-labs/authenticator v0.4.33/go.mod h1:ozNfT8WcruiSgnUbyp48WVfc41++W6xYXhKFp67lNTU=
github.com/devtron-labs/common-lib v0.0.11 h1:xyVjD09miYhOKt0Oc//kOBWJas/OXsP7dyRoA1Hg90U=
github.com/devtron-labs/common-lib v0.0.11/go.mod h1:95/DizzVXu1kHap/VwEvdxwgd+BvPVYc0bJzt8yqGDU=
github.com/devtron-labs/common-lib v0.0.12 h1:HirqTWtaXWPbfGeqQurjtn26b2Az7sMFZ1JAAz2koNM=
github.com/devtron-labs/common-lib v0.0.12/go.mod h1:95/DizzVXu1kHap/VwEvdxwgd+BvPVYc0bJzt8yqGDU=
github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2 h1:/IEIsJTxDZ3hv8uOoCaqdWCXqcv7nCAgX9AP/v84dUY=
github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2/go.mod h1:l85jxWHlcSo910hdUfRycL40yGzC6glE93V1sVxVPto=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down
12 changes: 12 additions & 0 deletions pkg/k8s/application/bean/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ const (
ArgoInstalledType = 1 // Identifier for ArgoCD deployment
)

const (
LastEventID = "Last-Event-ID"
TimestampOffsetToAvoidDuplicateLogs = 1
IntegerBase = 10
IntegerBitSize = 64
)

const (
LocalTimezoneInGMT = "GMT+0530"
LocalTimeOffset = 5*60*60 + 30*60
)

type ResourceInfo struct {
PodName string `json:"podName"`
}
Expand Down
49 changes: 40 additions & 9 deletions pkg/k8s/application/k8sApplicationService.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,32 @@ func (impl *K8sApplicationServiceImpl) ValidatePodLogsRequestQuery(r *http.Reque
v, vars := r.URL.Query(), mux.Vars(r)
request := &k8s.ResourceRequestBean{}
podName := vars["podName"]
/*sinceSeconds, err := strconv.Atoi(v.Get("sinceSeconds"))
if err != nil {
sinceSeconds = 0
}*/
sinceSecondsParam := v.Get("sinceSeconds")
var sinceSeconds int
var err error
if len(sinceSecondsParam) > 0 {
sinceSeconds, err = strconv.Atoi(sinceSecondsParam)
if err != nil || sinceSeconds <= 0 {
return nil, &util.ApiError{
Code: "400",
HttpStatusCode: 400,
UserMessage: "invalid value provided for sinceSeconds",
InternalMessage: "invalid value provided for sinceSeconds"}
}
}
sinceTimeParam := v.Get("sinceTime")
sinceTime := metav1.Unix(0, 0)
if len(sinceTimeParam) > 0 {
sinceTimeVar, err := strconv.ParseInt(sinceTimeParam, 10, 64)
if err != nil || sinceTimeVar <= 0 {
return nil, &util.ApiError{
Code: "400",
HttpStatusCode: 400,
UserMessage: "invalid value provided for sinceTime",
InternalMessage: "invalid value provided for sinceTime"}
}
sinceTime = metav1.Unix(sinceTimeVar, 0)
}
containerName, clusterIdString := v.Get("containerName"), v.Get("clusterId")
prevContainerLogs := v.Get("previous")
isPrevLogs, err := strconv.ParseBool(prevContainerLogs)
Expand All @@ -130,17 +152,26 @@ func (impl *K8sApplicationServiceImpl) ValidatePodLogsRequestQuery(r *http.Reque
if err != nil {
follow = false
}
tailLines, err := strconv.Atoi(v.Get("tailLines"))
if err != nil {
tailLines = 0
tailLinesParam := v.Get("tailLines")
var tailLines int
if len(tailLinesParam) > 0 {
tailLines, err = strconv.Atoi(tailLinesParam)
if err != nil || tailLines <= 0 {
return nil, &util.ApiError{
Code: "400",
HttpStatusCode: 400,
UserMessage: "invalid value provided for tailLines",
InternalMessage: "invalid value provided for tailLines"}
}
}
k8sRequest := &k8s2.K8sRequestBean{
ResourceIdentifier: k8s2.ResourceIdentifier{
Name: podName,
GroupVersionKind: schema.GroupVersionKind{},
},
PodLogsRequest: k8s2.PodLogsRequest{
//SinceTime: sinceSeconds,
SinceSeconds: sinceSeconds,
SinceTime: &sinceTime,
TailLines: tailLines,
Follow: follow,
ContainerName: containerName,
Expand Down Expand Up @@ -315,7 +346,7 @@ func (impl *K8sApplicationServiceImpl) GetPodLogs(ctx context.Context, request *

resourceIdentifier := request.K8sRequest.ResourceIdentifier
podLogsRequest := request.K8sRequest.PodLogsRequest
resp, err := impl.K8sUtil.GetPodLogs(ctx, restConfig, resourceIdentifier.Name, resourceIdentifier.Namespace, podLogsRequest.SinceTime, podLogsRequest.TailLines, podLogsRequest.Follow, podLogsRequest.ContainerName, podLogsRequest.IsPrevContainerLogsEnabled)
resp, err := impl.K8sUtil.GetPodLogs(ctx, restConfig, resourceIdentifier.Name, resourceIdentifier.Namespace, podLogsRequest.SinceTime, podLogsRequest.TailLines, podLogsRequest.SinceSeconds, podLogsRequest.Follow, podLogsRequest.ContainerName, podLogsRequest.IsPrevContainerLogsEnabled)
if err != nil {
impl.logger.Errorw("error in getting pod logs", "err", err, "clusterId", clusterId)
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/k8s/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ type ResourceRequestBean struct {
ClusterId int `json:"clusterId"` // clusterId is used when request is for direct cluster (not for helm release)
}

type LogsDownloadBean struct {
FileName string `json:"fileName"`
LogsData string `json:"data"`
}

type BatchResourceResponse struct {
ManifestResponse *k8s.ManifestResponse
Err error
Expand Down
Loading

0 comments on commit 95f7e2d

Please sign in to comment.