Skip to content

Commit

Permalink
Delete assembled chunks after writing them to fs
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Oct 30, 2020
1 parent 66cf4ea commit 2fb20d2
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 22 deletions.
16 changes: 12 additions & 4 deletions pkg/storage/fs/ocis/upload.go
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/cs3org/reva/pkg/user"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
tusd "github.com/tus/tusd/pkg/handler"
)

Expand All @@ -56,7 +55,8 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
var assembledFile string
p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
Expand All @@ -66,8 +66,14 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read
}
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
uploadInfo.info.Storage["NodeName"] = p
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
r = fd
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
Expand Down Expand Up @@ -364,6 +370,8 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
n.ID = uuid.New().String()
}
targetPath := upload.fs.lu.toInternalPath(n.ID)
log := appctx.GetLogger(upload.ctx)
log.Info().Msgf("targetPath: %+v, ID: %+v", targetPath, n.ID)

// if target exists create new version
var fi os.FileInfo
Expand Down
11 changes: 9 additions & 2 deletions pkg/storage/fs/owncloud/upload.go
Expand Up @@ -55,7 +55,8 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
var assembledFile string
p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
Expand All @@ -66,7 +67,13 @@ func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCl
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
r = fd
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
Expand Down
17 changes: 5 additions & 12 deletions pkg/storage/utils/chunking/chunking.go
Expand Up @@ -200,29 +200,22 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (bool, string, er

// WriteChunk saves an intermediate chunk temporarily and assembles all chunks
// once the final one is received.
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, io.ReadCloser, error) {
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) {
finish, chunk, err := c.saveChunk(fn, r)
if err != nil {
return "", nil, err
return "", "", err
}

if !finish {
return "", nil, nil
}

fd, err := os.Open(chunk)
if err != nil {
return "", nil, err
return "", "", nil
}

chunkInfo, err := GetChunkBLOBInfo(fn)
if err != nil {
return "", nil, err
return "", "", err
}

// Since we're returning a ReadCloser, it is the responsibility of the
// caller function to close it to prevent file descriptor leaks.
return chunkInfo.Path, fd, nil
return chunkInfo.Path, chunk, nil

// TODO(labkode): implement old chunking

Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/utils/eosfs/upload.go
Expand Up @@ -21,6 +21,7 @@ package eosfs
import (
"context"
"io"
"os"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
Expand Down Expand Up @@ -52,14 +53,21 @@ func (fs *eosfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadC
return errors.Wrap(err, "eos: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
var assembledFile string
p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
if p == "" {
return errtypes.PartialContent(ref.String())
}
defer r.Close()
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
r = fd
}

fn := fs.wrap(ctx, p)
Expand Down
11 changes: 9 additions & 2 deletions pkg/storage/utils/localfs/upload.go
Expand Up @@ -53,7 +53,8 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea
return errors.Wrap(err, "ocfs: error checking path")
}
if ok {
p, r, err = fs.chunkHandler.WriteChunk(p, r)
var assembledFile string
p, assembledFile, err = fs.chunkHandler.WriteChunk(p, r)
if err != nil {
return err
}
Expand All @@ -64,7 +65,13 @@ func (fs *localfs) Upload(ctx context.Context, ref *provider.Reference, r io.Rea
return errtypes.PartialContent(ref.String())
}
uploadInfo.info.Storage["InternalDestination"] = p
defer r.Close()
fd, err := os.Open(assembledFile)
if err != nil {
return errors.Wrap(err, "eos: error opening assembled file")
}
defer fd.Close()
defer os.RemoveAll(assembledFile)
r = fd
}

if _, err := uploadInfo.WriteChunk(ctx, 0, r); err != nil {
Expand Down

0 comments on commit 2fb20d2

Please sign in to comment.