diff --git a/auth/basic_auth.go b/auth/basic_auth.go index 6253b10e..4f225ef1 100644 --- a/auth/basic_auth.go +++ b/auth/basic_auth.go @@ -64,7 +64,7 @@ func (a *auth) BasicAuth() echo.MiddlewareFunc { printInMiddleware := true defer func() { if printInMiddleware { - a.logger.Log(ctx).Send() + a.logger.Log(ctx) } }() diff --git a/auth/signin.go b/auth/signin.go index 1d738619..3dc6bd22 100644 --- a/auth/signin.go +++ b/auth/signin.go @@ -11,11 +11,8 @@ import ( func (a *auth) SignIn(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - a.logger.Log(ctx).Send() - }() - var user User + var user User if err := json.NewDecoder(ctx.Request().Body).Decode(&user); err != nil { return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), @@ -26,6 +23,7 @@ func (a *auth) SignIn(ctx echo.Context) error { "error": "email and username cannot be empty, please provide at least one of them", } ctx.Set(types.HttpEndpointErrorKey, errMsg) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, errMsg) } @@ -34,6 +32,7 @@ func (a *auth) SignIn(ctx echo.Context) error { "error": "password cannot be empty", } ctx.Set(types.HttpEndpointErrorKey, errMsg) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, errMsg) } @@ -42,6 +41,7 @@ func (a *auth) SignIn(ctx echo.Context) error { if user.Email != "" { if err := verifyEmail(user.Email); err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) @@ -55,6 +55,7 @@ func (a *auth) SignIn(ctx echo.Context) error { userFromDb, err := a.pgStore.GetUser(ctx.Request().Context(), key) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) @@ -63,6 +64,7 @@ func (a *auth) SignIn(ctx echo.Context) error { if !a.verifyPassword(userFromDb.Password, user.Password) { errMsg := "invalid password" ctx.Set(types.HttpEndpointErrorKey, errMsg) + a.logger.Log(ctx) return ctx.JSON(http.StatusUnauthorized, errMsg) } @@ -75,6 +77,7 @@ func (a *auth) SignIn(ctx echo.Context) error { token, err := a.newToken(uu, tokenLife) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusInternalServerError, echo.Map{ "error": err.Error(), }) diff --git a/auth/signup.go b/auth/signup.go index 21eb770c..e3521368 100644 --- a/auth/signup.go +++ b/auth/signup.go @@ -140,14 +140,11 @@ func verifyPassword(password string) error { func (a *auth) SignUp(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - a.logger.Log(ctx).Send() - }() var u User - if err := json.NewDecoder(ctx.Request().Body).Decode(&u); err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), "message": "error decoding request body in sign-up", @@ -157,6 +154,7 @@ func (a *auth) SignUp(ctx echo.Context) error { if err := u.Validate(a.store); err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) @@ -165,6 +163,7 @@ func (a *auth) SignUp(ctx echo.Context) error { hpwd, err := a.hashPassword(u.Password) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusInternalServerError, echo.Map{ "error": err.Error(), }) @@ -180,6 +179,7 @@ func (a *auth) SignUp(ctx echo.Context) error { err = a.pgStore.AddUser(ctx.Request().Context(), newUser) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusInternalServerError, echo.Map{ "error": err.Error(), }) diff --git a/auth/token.go b/auth/token.go index beca3794..db7cd619 100644 --- a/auth/token.go +++ b/auth/token.go @@ -18,21 +18,20 @@ func (a *auth) Token(ctx echo.Context) error { // TODO (jay-dee7) - check for all valid query params here like serive, client_id, offline_token, etc // more at this link - https://docs.docker.com/registry/spec/auth/token/ ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - a.logger.Log(ctx).Send() - }() authHeader := ctx.Request().Header.Get(AuthorizationHeaderKey) if authHeader != "" { username, password, err := a.getCredsFromHeader(ctx.Request()) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.NoContent(http.StatusUnauthorized) } creds, err := a.validateUser(username, password) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.JSON(http.StatusUnauthorized, echo.Map{ "error": err.Error(), }) @@ -48,6 +47,7 @@ func (a *auth) Token(ctx echo.Context) error { "msg": "invalid scope provided", } ctx.Set(types.HttpEndpointErrorKey, errMsg) + a.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, errMsg) } @@ -57,6 +57,7 @@ func (a *auth) Token(ctx echo.Context) error { token, err := a.newPublicPullToken() if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + a.logger.Log(ctx) return ctx.NoContent(http.StatusInternalServerError) } diff --git a/go.mod b/go.mod index 07505b3b..556d91cd 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-playground/validator/v10 v10.9.0 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/google/uuid v1.3.0 + github.com/hashicorp/go-multierror v1.0.0 github.com/labstack/echo-contrib v0.11.0 github.com/labstack/echo/v4 v4.5.0 github.com/rs/zerolog v1.24.0 @@ -18,6 +19,8 @@ require ( golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 ) +require github.com/hashicorp/errwrap v1.0.0 // indirect + require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect diff --git a/go.sum b/go.sum index c3be02ec..b6ef6851 100644 --- a/go.sum +++ b/go.sum @@ -266,10 +266,12 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= diff --git a/main.go b/main.go index 89ed2fed..cc002150 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { os.Exit(1) } - logger := telemetry.ZLogger(telemetry.SetupLogger(), fluentBitCollector) + logger := telemetry.ZLogger(fluentBitCollector, cfg.Environment) authSvc := auth.New(localCache, cfg, pgStore, logger) skynetClient := skynet.NewClient(cfg) diff --git a/registry/v2/blobs.go b/registry/v2/blobs.go index 47c2a020..a887af3c 100644 --- a/registry/v2/blobs.go +++ b/registry/v2/blobs.go @@ -33,9 +33,6 @@ func (b *blobs) errorResponse(code, msg string, detail map[string]interface{}) [ func (b *blobs) HEAD(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - b.registry.logger.Log(ctx).Send() - }() digest := ctx.Param("digest") @@ -47,6 +44,7 @@ func (b *blobs) HEAD(ctx echo.Context) error { errMsg := b.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -58,6 +56,7 @@ func (b *blobs) HEAD(ctx echo.Context) error { } errMsg := b.errorResponse(RegistryErrorCodeManifestBlobUnknown, "Manifest does not exist", details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -74,9 +73,6 @@ these will be part of the txn in StartUpload */ func (b *blobs) UploadBlob(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - b.registry.logger.Log(ctx).Send() - }() namespace := ctx.Param("username") + "/" + ctx.Param("imagename") contentRange := ctx.Request().Header.Get("Content-Range") @@ -90,7 +86,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { nil, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -112,6 +108,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { nil, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -130,12 +127,14 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { } errMsg := b.errorResponse(RegistryErrorCodeBlobUploadUnknown, err.Error(), details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } if start != len(b.uploads[uuid]) { errMsg := b.errorResponse(RegistryErrorCodeBlobUploadUnknown, "content range mismatch", nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } @@ -148,6 +147,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { nil, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } // 10 ctx.Request().Body.Close() @@ -160,6 +160,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { nil, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + b.registry.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } locationHeader := fmt.Sprintf("/v2/%s/blobs/uploads/%s", namespace, uuid) diff --git a/registry/v2/registry.go b/registry/v2/registry.go index 70aa7fd4..a5001a1c 100644 --- a/registry/v2/registry.go +++ b/registry/v2/registry.go @@ -64,9 +64,6 @@ func (r *registry) LayerExists(ctx echo.Context) error { // OK func (r *registry) ManifestExists(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() namespace := ctx.Param("username") + "/" + ctx.Param("imagename") ref := ctx.Param("reference") // ref can be either tag or digest @@ -81,6 +78,7 @@ func (r *registry) ManifestExists(ctx echo.Context) error { errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -93,6 +91,7 @@ func (r *registry) ManifestExists(ctx echo.Context) error { errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, "Manifest does not exist", detail) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -105,6 +104,7 @@ func (r *registry) ManifestExists(ctx echo.Context) error { r.logger.Error(details) errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, "manifest digest does not match", nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -121,9 +121,6 @@ func (r *registry) ManifestExists(ctx echo.Context) error { // OK func (r *registry) Catalog(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() queryParamPageSize := ctx.QueryParam("n") queryParamOffset := ctx.QueryParam("last") @@ -134,6 +131,7 @@ func (r *registry) Catalog(ctx echo.Context) error { ps, err := strconv.ParseInt(ctx.QueryParam("n"), 10, 64) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + r.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) @@ -145,6 +143,7 @@ func (r *registry) Catalog(ctx echo.Context) error { o, err := strconv.ParseInt(ctx.QueryParam("last"), 10, 64) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + r.logger.Log(ctx) return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) @@ -155,6 +154,7 @@ func (r *registry) Catalog(ctx echo.Context) error { catalogList, err := r.store.GetCatalog(ctx.Request().Context(), namespace, pageSize, offset) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + r.logger.Log(ctx) return ctx.JSON(http.StatusInternalServerError, echo.Map{ "error": err.Error(), }) @@ -162,6 +162,7 @@ func (r *registry) Catalog(ctx echo.Context) error { total, err := r.store.GetCatalogCount(ctx.Request().Context()) if err != nil { ctx.Set(types.HttpEndpointErrorKey, err.Error()) + r.logger.Log(ctx) return ctx.JSON(http.StatusInternalServerError, echo.Map{ "error": err.Error(), }) @@ -178,9 +179,6 @@ func (r *registry) Catalog(ctx echo.Context) error { // OK func (r *registry) ListTags(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() namespace := ctx.Param("username") + "/" + ctx.Param("imagename") limit := ctx.QueryParam("n") @@ -189,7 +187,7 @@ func (r *registry) ListTags(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -198,7 +196,7 @@ func (r *registry) ListTags(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } if n > 0 { @@ -222,6 +220,8 @@ func (r *registry) List(ctx echo.Context) error { // GET /v2//manifests/ // OK func (r *registry) PullManifest(ctx echo.Context) error { + ctx.Set(types.HandlerStartTime, time.Now()) + namespace := ctx.Param("username") + "/" + ctx.Param("imagename") ref := ctx.Param("reference") @@ -229,12 +229,14 @@ func (r *registry) PullManifest(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } resp, err := r.skynet.Download(manifest.Skylink) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -242,6 +244,7 @@ func (r *registry) PullManifest(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } _ = resp.Close() @@ -258,9 +261,6 @@ func (r *registry) PullManifest(ctx echo.Context) error { func (r *registry) PullLayer(ctx echo.Context) error { //namespace := ctx.Param("username") + "/" + ctx.Param("imagename") ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() clientDigest := ctx.Param("digest") @@ -268,6 +268,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -279,6 +280,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { e := fmt.Errorf("skylink is empty").Error() errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, e, detail) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -290,12 +292,14 @@ func (r *registry) PullLayer(ctx echo.Context) error { } errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), detail) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } buf := &bytes.Buffer{} if _, err := io.Copy(buf, resp); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } _ = resp.Close() @@ -312,6 +316,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { details, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -323,6 +328,8 @@ func (r *registry) PullLayer(ctx echo.Context) error { // MonolithicUpload // PUT /v2//blobs/uploads/?digest= func (r *registry) MonolithicUpload(ctx echo.Context) error { + ctx.Set(types.HandlerStartTime, time.Now()) + namespace := ctx.Param("username") + "/" + ctx.Param("imagename") uuid := ctx.Param("uuid") digest := ctx.QueryParam("digest") @@ -331,6 +338,7 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error { if _, err := io.Copy(buf, ctx.Request().Body); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } _ = ctx.Request().Body.Close() @@ -343,6 +351,7 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error { } errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), detail) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusInternalServerError, buf.Bytes()) } @@ -359,6 +368,7 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -381,9 +391,6 @@ registry.tnxMap[uuid] = {txn,blobs[],timeout} // POST /v2//blobs/uploads/ func (r *registry) StartUpload(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() namespace := ctx.Param("username") + "/" + ctx.Param("imagename") clientDigest := ctx.QueryParam("digest") @@ -402,6 +409,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } _ = ctx.Request().Body.Close() // why defer? body is already read :) @@ -418,7 +426,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { details, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -426,7 +434,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } @@ -443,23 +451,26 @@ func (r *registry) StartUpload(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } if err := r.store.SetLayer(ctx.Request().Context(), txnOp, layerV2); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } if err := r.store.Commit(ctx.Request().Context(), txnOp); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } link := r.getHttpUrlFromSkylink(skylink) ctx.Response().Header().Set("Location", link) + r.logger.Log(ctx) return ctx.NoContent(http.StatusCreated) } @@ -473,6 +484,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { nil, ) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } r.txnMap[id.String()] = TxnStore{ @@ -488,11 +500,9 @@ func (r *registry) StartUpload(ctx echo.Context) error { return ctx.NoContent(http.StatusAccepted) } +//UploadProgress TODO func (r *registry) UploadProgress(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() namespace := ctx.Param("username") + "/" + ctx.Param("imagename") uuid := ctx.Param("uuid") @@ -533,6 +543,8 @@ and inserted in the blob table thus committing the txn */ func (r *registry) CompleteUpload(ctx echo.Context) error { + ctx.Set(types.HandlerStartTime, time.Now()) + dig := ctx.QueryParam("digest") namespace := ctx.Param("username") + "/" + ctx.Param("imagename") id := ctx.Param("uuid") @@ -541,6 +553,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { if _, err := io.Copy(buf, ctx.Request().Body); err != nil { errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } _ = ctx.Request().Body.Close() @@ -556,6 +569,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { } errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, "digest mismatch", details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -564,6 +578,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } @@ -579,6 +594,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { if !ok { errMsg := r.errorResponse(RegistryErrorCodeUnknown, "transaction does not exist for uuid -"+id, nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -587,6 +603,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { "error_detail": "set layer issues", }) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -595,6 +612,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { "error_detail": "commitment issue", }) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } delete(r.txnMap, id) @@ -617,13 +635,11 @@ func (r *registry) PushImage(ctx echo.Context) error { } func (r *registry) PushManifest(ctx echo.Context) error { + ctx.Set(types.HandlerStartTime, time.Now()) + namespace := ctx.Param("username") + "/" + ctx.Param("imagename") ref := ctx.Param("reference") contentType := ctx.Request().Header.Get("Content-Type") - ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() var manifest ImageManifest @@ -641,6 +657,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } dig := digest(buf.Bytes()) @@ -650,6 +667,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -683,6 +701,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { "reason": "PG_ERR_CREATE_NEW_TXN", }) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) _ = r.store.Abort(ctx.Request().Context(), txnOp) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } @@ -690,6 +709,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { if err := r.store.SetManifest(ctx.Request().Context(), txnOp, val); err != nil { errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) _ = r.store.Abort(ctx.Request().Context(), txnOp) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -697,6 +717,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { if err := r.store.SetConfig(ctx.Request().Context(), txnOp, mfc); err != nil { errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) _ = r.store.Abort(ctx.Request().Context(), txnOp) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -706,6 +727,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { "reason": "ERR_PG_COMMIT_TXN", }) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) _ = r.store.Abort(ctx.Request().Context(), txnOp) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } @@ -721,9 +743,6 @@ func (r *registry) PushManifest(ctx echo.Context) error { // POST /v2//blobs/uploads/ func (r *registry) PushLayer(ctx echo.Context) error { ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() elem := strings.Split(ctx.Request().URL.Path, "/") elem = elem[1:] @@ -734,7 +753,7 @@ func (r *registry) PushLayer(ctx echo.Context) error { if len(elem) < 4 { errMsg := r.errorResponse(RegistryErrorCodeNameInvalid, "blobs must be attached to a repo", nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) - + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -755,6 +774,8 @@ func (r *registry) CancelUpload(ctx echo.Context) error { // DeleteTagOrManifest // DELETE /v2//manifest/ or func (r *registry) DeleteTagOrManifest(ctx echo.Context) error { + ctx.Set(types.HandlerStartTime, time.Now()) + namespace := ctx.Param("username") + "/" + ctx.Param("imagename") ref := ctx.Param("reference") @@ -773,6 +794,7 @@ func (r *registry) DeleteTagOrManifest(ctx echo.Context) error { } errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), details) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -781,7 +803,8 @@ func (r *registry) DeleteTagOrManifest(ctx echo.Context) error { } func (r *registry) DeleteLayer(ctx echo.Context) error { - //namespace := ctx.Param("username") + "/" + ctx.Param("imagename") + ctx.Set(types.HandlerStartTime, time.Now()) + dig := ctx.Param("digest") //var m types.Metadata @@ -792,6 +815,7 @@ func (r *registry) DeleteLayer(ctx echo.Context) error { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) ctx.Set(types.HttpEndpointErrorKey, errMsg) + r.logger.Log(ctx) return ctx.JSONBlob(http.StatusNotFound, errMsg) } blobs := layer.BlobDigests @@ -808,6 +832,7 @@ func (r *registry) DeleteLayer(ctx echo.Context) error { bz, err := json.Marshal(logMsg) if err == nil { ctx.Set(types.HttpEndpointErrorKey, logMsg) + r.logger.Log(ctx) } return ctx.JSONBlob(http.StatusInternalServerError, bz) @@ -822,6 +847,7 @@ func (r *registry) DeleteLayer(ctx echo.Context) error { } ctx.Set(types.HttpEndpointErrorKey, logMsg) + r.logger.Log(ctx) bz, err := json.Marshal(logMsg) if err != nil { r.log.Err(err).Send() @@ -837,17 +863,13 @@ func (r *registry) DeleteLayer(ctx echo.Context) error { // Should also look into 401 Code // https://docs.docker.com/registry/spec/api/ func (r *registry) ApiVersion(ctx echo.Context) error { - ctx.Set(types.HandlerStartTime, time.Now()) - defer func() { - r.logger.Log(ctx).Send() - }() ctx.Response().Header().Set(HeaderDockerDistributionApiVersion, "registry/2.0") - return ctx.String(http.StatusOK, "OK\n") } func (r *registry) GetImageNamespace(ctx echo.Context) error { + searchQuery := ctx.QueryParam("search_query") if searchQuery == "" { return ctx.JSON(http.StatusBadRequest, echo.Map{ diff --git a/telemetry/consoleWriter.go b/telemetry/consoleWriter.go new file mode 100644 index 00000000..dd154837 --- /dev/null +++ b/telemetry/consoleWriter.go @@ -0,0 +1,88 @@ +package telemetry + +import ( + "bytes" + "os" + "time" + + "github.com/containerish/OpenRegistry/types" + "github.com/fatih/color" + "github.com/hashicorp/go-multierror" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +func (l logger) consoleWriter(ctx echo.Context) { + l.zlog = log.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC822}) + l.zlog = l.zlog.With().Caller().Logger() + + buf := l.pool.Get().(*bytes.Buffer) + buf.Reset() + defer l.pool.Put(buf) + + req := ctx.Request() + res := ctx.Response() + + status := res.Status + level := zerolog.InfoLevel + switch { + case status >= 500: + level = zerolog.ErrorLevel + case status >= 400: + level = zerolog.WarnLevel + case status >= 300: + level = zerolog.ErrorLevel + } + + var e multierror.Error + + _, err := buf.WriteString(req.Host + " ") + if err != nil { + e.Errors = append(e.Errors, err) + } + + _, err = buf.WriteString(req.Method + " ") + if err != nil { + e.Errors = append(e.Errors, err) + } + + _, err = buf.WriteString(req.RequestURI + " ") + if err != nil { + e.Errors = append(e.Errors, err) + } + + _, err = buf.WriteString(req.Proto + " ") + if err != nil { + e.Errors = append(e.Errors, err) + } + + _, err = buf.WriteString(req.UserAgent() + " ") + if err != nil { + e.Errors = append(e.Errors, err) + } + + if level == zerolog.InfoLevel { + _, err = buf.WriteString(color.GreenString(" %d", res.Status)) + if err != nil { + e.Errors = append(e.Errors, err) + } + } else { + _, err = buf.WriteString(color.RedString(" %d", res.Status)) + if err != nil { + e.Errors = append(e.Errors, err) + } + } + + if ctxErr, ok := ctx.Get(types.HttpEndpointErrorKey).(string); ok { + _, err = buf.WriteString(color.YellowString(" %s", ctxErr)) + if err != nil { + e.Errors = append(e.Errors, err) + } + } + + if err != nil { + buf.WriteString(err.Error()) + } + l.zlog.WithLevel(level).Msg(buf.String()) +} diff --git a/telemetry/log.go b/telemetry/log.go index 9b262111..945437f5 100644 --- a/telemetry/log.go +++ b/telemetry/log.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/containerish/OpenRegistry/config" + fluentbit "github.com/containerish/OpenRegistry/telemetry/fluent-bit" "github.com/containerish/OpenRegistry/types" "github.com/labstack/echo/v4" @@ -21,15 +23,24 @@ import ( type Logger interface { echo.Logger - Log(ctx echo.Context) *zerolog.Event + Log(ctx echo.Context) } -func SetupLogger() zerolog.Logger { +func SetupLogger(env string) zerolog.Logger { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - zerolog.SetGlobalLevel(zerolog.DebugLevel) - l := zerolog.New(os.Stdout) l = l.With().Caller().Logger() + if env != config.Prod { + zerolog.SetGlobalLevel(zerolog.TraceLevel) + consoleWriter := zerolog.ConsoleWriter{ + Out: os.Stdout, + NoColor: false, + TimeFormat: time.RFC3339, + } + l.Output(consoleWriter) + return l + } + zerolog.SetGlobalLevel(zerolog.DebugLevel) return l } @@ -163,9 +174,10 @@ type logger struct { pool *sync.Pool template *fasttemplate.Template zlog zerolog.Logger + env string } -func ZLogger(baseLogger zerolog.Logger, fluentbitClient fluentbit.FluentBit) Logger { +func ZLogger(fluentbitClient fluentbit.FluentBit, env string) Logger { pool := &sync.Pool{ New: func() interface{} { return bytes.NewBuffer(make([]byte, 256)) @@ -176,17 +188,25 @@ func ZLogger(baseLogger zerolog.Logger, fluentbitClient fluentbit.FluentBit) Log `"status":${status},"error":"${error}","latency":${latency},"latency_human":"${latency_human}"` + `,"bytes_in":${bytes_in},"bytes_out":${bytes_out}}` + "\n" + baseLogger := SetupLogger(env) + return &logger{ zlog: baseLogger, fluentBit: fluentbitClient, output: os.Stdout, pool: pool, template: fasttemplate.New(logFmt, "${", "}"), + env: env, } } //nolint:cyclop // insane amount of complexity because of templating -func (l logger) Log(ctx echo.Context) *zerolog.Event { +func (l logger) Log(ctx echo.Context) { + + if l.env != config.Prod { + l.consoleWriter(ctx) + return + } start, ok := ctx.Get("start").(time.Time) if !ok { @@ -284,7 +304,7 @@ func (l logger) Log(ctx echo.Context) *zerolog.Event { bz := bytes.TrimSpace(buf.Bytes()) l.fluentBit.Send(bz) - return l.zlog.WithLevel(level).RawJSON("msg", bz) + l.zlog.WithLevel(level).RawJSON("msg", bz).Send() } func (l logger) Output() io.Writer {