Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): use new storagev2 interfaces #3833

Merged
merged 5 commits into from Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
365 changes: 110 additions & 255 deletions pkg/api/api.go

Large diffs are not rendered by default.

235 changes: 109 additions & 126 deletions pkg/api/api_test.go

Large diffs are not rendered by default.

109 changes: 49 additions & 60 deletions pkg/api/bytes.go
Expand Up @@ -16,10 +16,8 @@ import (
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
storage "github.com/ethersphere/bee/pkg/storagev2"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/util/ioutil"
"github.com/gorilla/mux"
Expand All @@ -34,15 +32,42 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bytes").Build())

headers := struct {
ContentType string `map:"Content-Type" validate:"excludes=multipart/form-data"`
SwarmTag string `map:"Swarm-Tag"`
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
SwarmTag uint64 `map:"Swarm-Tag"`
Pin bool `map:"Swarm-Pin"`
Deferred bool `map:"Swarm-Deferred-Upload"`
Encrypt bool `map:"Swarm-Encrypt"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

putter, wait, err := s.newStamperPutter(r)
var (
tag uint64
err error
)
if headers.Deferred || headers.Pin {
tag, err = s.getOrCreateSessionID(headers.SwarmTag)
if err != nil {
logger.Debug("get or create tag failed", "error", err)
logger.Error(nil, "get or create tag failed")
switch {
case errors.Is(err, storage.ErrNotFound):
jsonhttp.NotFound(w, "tag not found")
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
return
}
}

putter, err := s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: headers.Deferred,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
Expand All @@ -61,36 +86,14 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

tag, created, err := s.getOrCreateTag(headers.SwarmTag)
if err != nil {
logger.Debug("get or create tag failed", "error", err)
logger.Error(nil, "get or create tag failed")
switch {
case errors.Is(err, tags.ErrNotFound):
jsonhttp.NotFound(w, "tag not found")
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
return
}

if !created {
// only in the case when tag is sent via header (i.e. not created by this request)
if estimatedTotalChunks := requestCalculateNumberOfChunks(r); estimatedTotalChunks > 0 {
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
if err != nil {
logger.Debug("increment tag failed", "error", err)
logger.Error(nil, "increment tag failed")
jsonhttp.InternalServerError(w, "increment tag failed")
return
}
}
ow := &cleanupOnErrWriter{
ResponseWriter: w,
onErr: putter.Cleanup,
logger: logger,
}

// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
p := requestPipelineFn(putter, r)
ctx, cancel := context.WithCancel(ctx)
p := requestPipelineFn(putter, headers.Encrypt)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
pr := ioutil.TimeoutReader(ctx, r.Body, time.Minute, func(n uint64) {
logger.Error(nil, "idle read timeout exceeded")
Expand All @@ -103,39 +106,23 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger.Error(nil, "split write all failed")
switch {
case errors.Is(err, postage.ErrBucketFull):
jsonhttp.PaymentRequired(w, "batch is overissued")
jsonhttp.PaymentRequired(ow, "batch is overissued")
default:
jsonhttp.InternalServerError(w, "split write all failed")
jsonhttp.InternalServerError(ow, "split write all failed")
}
return
}
if err = wait(); err != nil {
logger.Debug("sync chunks failed", "error", err)
logger.Error(nil, "sync chunks failed")
jsonhttp.InternalServerError(w, "sync chunks failed")
return
}

if created {
_, err = tag.DoneSplit(address)
if err != nil {
logger.Debug("done split failed", "error", err)
logger.Error(nil, "done split failed")
jsonhttp.InternalServerError(w, "done split filed")
return
}
err = putter.Done(address)
if err != nil {
logger.Debug("done split failed", "error", err)
logger.Error(nil, "done split failed")
jsonhttp.InternalServerError(ow, "done split filed")
aloknerurkar marked this conversation as resolved.
Show resolved Hide resolved
}

if requestPin(r) {
if err := s.pinning.CreatePin(ctx, address, false); err != nil {
logger.Debug("pin creation failed", "address", address, "error", err)
logger.Error(nil, "pin creation failed")
jsonhttp.InternalServerError(w, "create ping failed")
return
}
if tag != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag))
}

w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.Created(w, bytesPostResponse{
Reference: address,
Expand Down Expand Up @@ -172,7 +159,9 @@ func (s *Service) bytesHeadHandler(w http.ResponseWriter, r *http.Request) {
return
}

ch, err := s.storer.Get(r.Context(), storage.ModeGetRequest, paths.Address)
getter := s.storer.Download(true)

ch, err := getter.Get(r.Context(), paths.Address)
if err != nil {
logger.Debug("get root chunk failed", "chunk_address", paths.Address, "error", err)
logger.Error(nil, "get rook chunk failed")
Expand Down