Skip to content

Commit

Permalink
Merge pull request #114 from containerish/remove-ioReadAll
Browse files Browse the repository at this point in the history
Fix: removed io.ReadAll from entire project
  • Loading branch information
guacamole committed Mar 6, 2022
2 parents 3d9dccc + bfb5354 commit a3dd356
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 57 deletions.
16 changes: 4 additions & 12 deletions auth/signup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package auth
import (
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strings"
Expand Down Expand Up @@ -146,22 +145,15 @@ func (a *auth) SignUp(ctx echo.Context) error {
}()

var u User
bz, err := io.ReadAll(ctx.Request().Body)
if err != nil {
ctx.Set(types.HttpEndpointErrorKey, err.Error())
return ctx.JSON(http.StatusBadRequest, echo.Map{
"error": err.Error(),
"msg": "invalid request body",
})
}
ctx.Request().Body.Close()

if err := json.Unmarshal(bz, &u); err != nil {
if err := json.NewDecoder(ctx.Request().Body).Decode(&u); err != nil {
ctx.Set(types.HttpEndpointErrorKey, err.Error())
return ctx.JSON(http.StatusBadRequest, echo.Map{
"error": err.Error(),
"error": err.Error(),
"message": "error decoding request body in sign-up",
})
}
_ = ctx.Request().Body.Close()

if err := u.Validate(a.store); err != nil {
ctx.Set(types.HttpEndpointErrorKey, err.Error())
Expand Down
16 changes: 11 additions & 5 deletions registry/v2/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,18 @@ func (b *blobs) UploadBlob(ctx echo.Context) error {
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}

bz, _ := io.ReadAll(ctx.Request().Body)
defer ctx.Request().Body.Close()
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, ctx.Request().Body); err != nil {
return ctx.JSON(http.StatusBadRequest, echo.Map{
"error": err.Error(),
"message": "error copying request body in upload blob",
})
}

b.uploads[uuid] = bz
_ = ctx.Request().Body.Close()
b.uploads[uuid] = buf.Bytes()

if err := b.blobTransaction(ctx, bz, uuid); err != nil {
if err := b.blobTransaction(ctx, buf.Bytes(), uuid); err != nil {
errMsg := b.errorResponse(
RegistryErrorCodeBlobUploadInvalid,
err.Error(),
Expand All @@ -111,7 +117,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error {

locationHeader := fmt.Sprintf("/v2/%s/blobs/uploads/%s", namespace, uuid)
ctx.Response().Header().Set("Location", locationHeader)
ctx.Response().Header().Set("Range", fmt.Sprintf("0-%d", len(bz)-1))
ctx.Response().Header().Set("Range", fmt.Sprintf("0-%d", len(buf.Bytes())-1))
return ctx.NoContent(http.StatusAccepted)
}

Expand Down
97 changes: 57 additions & 40 deletions registry/v2/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func (r *registry) PullManifest(ctx echo.Context) error {
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusNotFound, errMsg)
}

resp, err := r.skynet.Download(manifest.Skylink)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil)
Expand All @@ -246,7 +245,6 @@ func (r *registry) PullManifest(ctx echo.Context) error {
return ctx.JSONBlob(http.StatusNotFound, errMsg)
}
_ = resp.Close()

ctx.Response().Header().Set("Docker-Content-Digest", manifest.Digest)
ctx.Response().Header().Set("X-Docker-Content-ID", manifest.Skylink)
ctx.Response().Header().Set("Content-Type", manifest.MediaType)
Expand Down Expand Up @@ -294,16 +292,15 @@ func (r *registry) PullLayer(ctx echo.Context) error {
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusNotFound, errMsg)
}

bz, err := io.ReadAll(resp)
if err != nil {
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, resp); err != nil {
errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusInternalServerError, errMsg)
}
_ = resp.Close()

dig := digest(bz)
dig := digest(buf.Bytes())
if dig != clientDigest {
details := map[string]interface{}{
"clientDigest": clientDigest,
Expand All @@ -318,9 +315,9 @@ func (r *registry) PullLayer(ctx echo.Context) error {
return ctx.JSONBlob(http.StatusNotFound, errMsg)
}

ctx.Response().Header().Set("Content-Length", fmt.Sprintf("%d", len(bz)))
ctx.Response().Header().Set("Content-Length", fmt.Sprintf("%d", len(buf.Bytes())))
ctx.Response().Header().Set("Docker-Content-Digest", dig)
return ctx.Blob(http.StatusOK, "application/octet-stream", bz)
return ctx.Blob(http.StatusOK, "application/octet-stream", buf.Bytes())
}

// MonolithicUpload
Expand All @@ -330,31 +327,31 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error {
uuid := ctx.Param("uuid")
digest := ctx.QueryParam("digest")

bz, err := io.ReadAll(ctx.Request().Body)
if err != nil {
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, ctx.Request().Body); err != nil {
errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}
ctx.Request().Body.Close()
_ = ctx.Request().Body.Close()

link, err := r.skynet.Upload(namespace, digest, bz, true)
link, err := r.skynet.Upload(namespace, digest, buf.Bytes(), true)
if err != nil {
detail := echo.Map{
"error": err.Error(),
"caller": "MonolithicUpload",
}
errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), detail)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusInternalServerError, bz)
return ctx.JSONBlob(http.StatusInternalServerError, buf.Bytes())
}

metadata := types.Metadata{
Namespace: namespace,
Manifest: types.ImageManifest{
SchemaVersion: 2,
MediaType: "",
Layers: []*types.Layer{{MediaType: "", Size: len(bz), Digest: digest, SkynetLink: link, UUID: uuid}},
Layers: []*types.Layer{{MediaType: "", Size: len(buf.Bytes()), Digest: digest, SkynetLink: link, UUID: uuid}},
},
}

Expand Down Expand Up @@ -392,8 +389,8 @@ func (r *registry) StartUpload(ctx echo.Context) error {
clientDigest := ctx.QueryParam("digest")

if clientDigest != "" {
bz, err := io.ReadAll(ctx.Request().Body)
if err != nil {
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, ctx.Request().Body); err != nil {
details := map[string]interface{}{
"clientDigest": clientDigest,
"namespace": namespace,
Expand All @@ -407,8 +404,8 @@ func (r *registry) StartUpload(ctx echo.Context) error {
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusNotFound, errMsg)
}
ctx.Request().Body.Close() // why defer? body is already read :)
dig := digest(bz)
_ = ctx.Request().Body.Close() // why defer? body is already read :)
dig := digest(buf.Bytes())

if dig != clientDigest {
details := map[string]interface{}{
Expand All @@ -425,7 +422,7 @@ func (r *registry) StartUpload(ctx echo.Context) error {
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}

skylink, err := r.skynet.Upload(namespace, dig, bz, true)
skylink, err := r.skynet.Upload(namespace, dig, buf.Bytes(), true)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
Expand All @@ -439,7 +436,7 @@ func (r *registry) StartUpload(ctx echo.Context) error {
SkynetLink: skylink,
UUID: uuid.NewString(),
BlobDigests: nil,
Size: len(bz),
Size: len(buf.Bytes()),
}

txnOp, err := r.store.NewTxn(ctx.Request().Context())
Expand Down Expand Up @@ -540,30 +537,30 @@ func (r *registry) CompleteUpload(ctx echo.Context) error {
namespace := ctx.Param("username") + "/" + ctx.Param("imagename")
id := ctx.Param("uuid")

bz, err := io.ReadAll(ctx.Request().Body)
if err != nil {
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, ctx.Request().Body); err != nil {
errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}
_ = ctx.Request().Body.Close()
// insert if bz is not nil
buf := bytes.NewBuffer(r.b.uploads[id])
buf.Write(bz)
ourHash := digest(buf.Bytes())
ubuf := bytes.NewBuffer(r.b.uploads[id])
ubuf.Write(buf.Bytes())
ourHash := digest(ubuf.Bytes())
delete(r.b.uploads, id)

if ourHash != dig {
details := map[string]interface{}{
"headerDigest": dig, "serverSideDigest": ourHash, "bodyDigest": digest(bz),
"headerDigest": dig, "serverSideDigest": ourHash, "bodyDigest": digest(buf.Bytes()),
}
errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, "digest mismatch", details)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}

blobNamespace := fmt.Sprintf("%s/blobs", namespace)
skylink, err := r.skynet.Upload(blobNamespace, dig, buf.Bytes(), true)
skylink, err := r.skynet.Upload(blobNamespace, dig, ubuf.Bytes(), true)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
Expand All @@ -577,7 +574,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error {
SkynetLink: skylink,
UUID: id,
BlobDigests: txnOp.blobDigests,
Size: len(bz),
Size: len(buf.Bytes()),
}
if !ok {
errMsg := r.errorResponse(RegistryErrorCodeUnknown, "transaction does not exist for uuid -"+id, nil)
Expand Down Expand Up @@ -623,26 +620,33 @@ func (r *registry) PushManifest(ctx echo.Context) error {
namespace := ctx.Param("username") + "/" + ctx.Param("imagename")
ref := ctx.Param("reference")
contentType := ctx.Request().Header.Get("Content-Type")
ctx.Set(types.HandlerStartTime, time.Now())
defer func() {
r.logger.Log(ctx).Send()
}()

bz, err := io.ReadAll(ctx.Request().Body)
var manifest ImageManifest

buf := &bytes.Buffer{}
_, err := io.Copy(buf, ctx.Request().Body)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
return ctx.JSON(http.StatusBadRequest, echo.Map{
"error": err.Error(),
"message": "failed in push manifest while io Copy",
})
}
ctx.Request().Body.Close()

dig := digest(bz)
_ = ctx.Request().Body.Close()

var manifest ImageManifest
if err = json.Unmarshal(bz, &manifest); err != nil {
err = json.Unmarshal(buf.Bytes(), &manifest)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
return ctx.JSONBlob(http.StatusBadRequest, errMsg)
}
dig := digest(buf.Bytes())

mfNamespace := fmt.Sprintf("%s/manifests", namespace)
skylink, err := r.skynet.Upload(mfNamespace, dig, bz, true)
skylink, err := r.skynet.Upload(mfNamespace, dig, buf.Bytes(), true)
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), nil)
ctx.Set(types.HttpEndpointErrorKey, errMsg)
Expand Down Expand Up @@ -673,7 +677,15 @@ func (r *registry) PushManifest(ctx echo.Context) error {
SchemaVersion: 2,
}

txnOp, _ := r.store.NewTxn(context.Background())
txnOp, err := r.store.NewTxn(context.Background())
if err != nil {
errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), echo.Map{
"reason": "PG_ERR_CREATE_NEW_TXN",
})
ctx.Set(types.HttpEndpointErrorKey, errMsg)
_ = r.store.Abort(ctx.Request().Context(), txnOp)
return ctx.JSONBlob(http.StatusInternalServerError, errMsg)
}

if err := r.store.SetManifest(ctx.Request().Context(), txnOp, val); err != nil {
errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), nil)
Expand All @@ -690,7 +702,12 @@ func (r *registry) PushManifest(ctx echo.Context) error {
}

if err = r.store.Commit(ctx.Request().Context(), txnOp); err != nil {
return err
errMsg := r.errorResponse(RegistryErrorCodeUnknown, err.Error(), echo.Map{
"reason": "ERR_PG_COMMIT_TXN",
})
ctx.Set(types.HttpEndpointErrorKey, errMsg)
_ = r.store.Abort(ctx.Request().Context(), txnOp)
return ctx.JSONBlob(http.StatusInternalServerError, errMsg)
}

locationHeader := r.getHttpUrlFromSkylink(skylink)
Expand Down

0 comments on commit a3dd356

Please sign in to comment.