Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1186 from antsystem/add-client-stream-writer
Browse files Browse the repository at this point in the history
feature: add client stream writer for Streaming
  • Loading branch information
lowzj committed Feb 12, 2020
2 parents 0508f23 + 6692474 commit 790e620
Show file tree
Hide file tree
Showing 9 changed files with 498 additions and 37 deletions.
6 changes: 6 additions & 0 deletions dfget/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ type RuntimeVariable struct {
// RealTarget specifies the full target path whose value is equal to the `Output`.
RealTarget string

// StreamMode specifies that all pieces will be wrote to a Pipe, currently only support cdn mode.
// when StreamMode is true, all data will write directly.
// the mode is prepared for this issue https://github.com/dragonflyoss/Dragonfly/issues/1164
// TODO: support p2p mode
StreamMode bool

// TargetDir is the directory of the RealTarget path.
TargetDir string

Expand Down
52 changes: 52 additions & 0 deletions dfget/core/downloader/back_downloader/back_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/printer"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -127,6 +128,31 @@ func (bd *BackDownloader) Run(ctx context.Context) error {
return err
}

// RunStream returns a io.Reader without any disk io.
func (bd *BackDownloader) RunStream(ctx context.Context) (io.Reader, error) {
var (
resp *http.Response
err error
)

if bd.cfg.Notbs || bd.cfg.BackSourceReason == config.BackSourceReasonNoSpace {
bd.cfg.BackSourceReason += config.ForceNotBackSourceAddition
err = fmt.Errorf("download fail and not back source: %d", bd.cfg.BackSourceReason)
return nil, err
}

if resp, err = httputils.HTTPGetWithTLS(bd.URL, netutils.ConvertHeaders(bd.cfg.Header), 0, bd.cfg.Cacerts, bd.cfg.Insecure); err != nil {
return nil, err
}

if !bd.isSuccessStatus(resp.StatusCode) {
return nil, fmt.Errorf("failed to download from source, response code:%d", resp.StatusCode)
}

limitReader := limitreader.NewLimitReader(resp.Body, int64(bd.cfg.LocalLimit), bd.Md5 != "")
return &autoCloseLimitReader{closer: resp.Body, limitReader: limitReader, md5: bd.Md5}, nil
}

// Cleanup clean all temporary resources generated by executing Run.
func (bd *BackDownloader) Cleanup() {
if bd.cleaned {
Expand All @@ -142,3 +168,29 @@ func (bd *BackDownloader) Cleanup() {
func (bd *BackDownloader) isSuccessStatus(code int) bool {
return code < 400
}

// autoCloseLimitReader will auto close when reader return a error(include io.EOF).
// it is necessary when return http.Response.Body as an io.Reader.
type autoCloseLimitReader struct {
closer io.Closer
md5 string
limitReader *limitreader.LimitReader
}

func (a *autoCloseLimitReader) Read(p []byte) (n int, err error) {
n, err = a.limitReader.Read(p)
// when return err, always close
if err != nil {
if closeError := a.closer.Close(); closeError != nil {
err = errors.Wrapf(err, "close error: %s", closeError)
}
}
// all data received, calculate md5
if err == io.EOF && a.md5 != "" {
realMd5 := a.limitReader.Md5()
if realMd5 != a.md5 {
return n, fmt.Errorf("md5 not match, expected: %s real: %s", a.md5, realMd5)
}
}
return n, err
}
47 changes: 47 additions & 0 deletions dfget/core/downloader/back_downloader/back_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package downloader
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
Expand Down Expand Up @@ -102,6 +103,52 @@ func (s *BackDownloaderTestSuite) TestBackDownloader_Run(c *check.C) {
c.Assert(bd.Run(context.TODO()), check.IsNil)
}

func (s *BackDownloaderTestSuite) TestBackDownloader_RunStream(c *check.C) {
testFileMd5 := helper.CreateTestFileWithMD5(filepath.Join(s.workHome, "download.test"), "test downloader")
dst := filepath.Join(s.workHome, "back.test")

cfg := helper.CreateConfig(nil, s.workHome)
bd := &BackDownloader{
cfg: cfg,
URL: "http://" + s.host + "/download.test",
Target: dst,
}

var reader io.Reader
var err error
cfg.Notbs = true
_, err = bd.RunStream(context.TODO())
c.Assert(err, check.NotNil)

cfg.Notbs = false
bd.cleaned = false
cfg.BackSourceReason = config.BackSourceReasonNoSpace
reader, err = bd.RunStream(context.TODO())
c.Assert(reader, check.IsNil)
c.Assert(err, check.NotNil)

// test: realMd5 doesn't equal to expectedMd5
bd.Md5 = "x"
reader, err = bd.RunStream(context.TODO())

c.Assert(reader, check.NotNil)
if reader != nil {
_, err = ioutil.ReadAll(reader)
}
c.Assert(err, check.NotNil)

// test: realMd5 equals to expectedMd5
bd.cleaned = false
bd.Md5 = testFileMd5
reader, err = bd.RunStream(context.TODO())

c.Assert(reader, check.NotNil)
if reader != nil {
_, err = ioutil.ReadAll(reader)
}
c.Assert(err, check.IsNil)
}

func (s *BackDownloaderTestSuite) TestBackDownloader_Run_NotExist(c *check.C) {
dst := filepath.Join(s.workHome, "back.test")

Expand Down
3 changes: 3 additions & 0 deletions dfget/core/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package downloader
import (
"context"
"fmt"
"io"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
Expand All @@ -35,6 +36,8 @@ import (
// Downloader is the interface to download files
type Downloader interface {
Run(ctx context.Context) error
// RunStream return a io.Reader instead of writing a file without any disk io.
RunStream(ctx context.Context) (io.Reader, error)
Cleanup()
}

Expand Down
5 changes: 5 additions & 0 deletions dfget/core/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package downloader

import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -89,5 +90,9 @@ func (md *MockDownloader) Run(ctx context.Context) error {
return nil
}

func (md *MockDownloader) RunStream(ctx context.Context) (io.Reader, error) {
return nil, nil
}

func (md *MockDownloader) Cleanup() {
}
187 changes: 187 additions & 0 deletions dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package downloader

import (
"context"
"fmt"
"io"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
)

// ClientWriter writes a file for uploading and a target file.
type ClientStreamWriter struct {
// clientQueue maintains a queue of tasks that need to be written to disk.
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue
// finish indicates whether the task written is completed.
finish chan struct{}

// pieceIndex records the number of pieces currently downloaded.
pieceIndex int
// result records whether the write operation was successful.
result bool

// p2pPattern records whether the pattern equals "p2p".
p2pPattern bool

// pipeWriter is the writer half of a pipe, all piece data will be wrote into pipeWriter
pipeWriter *io.PipeWriter

// pipeReader is the read half of a pipe
pipeReader *io.PipeReader

// limitReader supports limit rate and calculates md5
limitReader *limitreader.LimitReader

cache map[int]*Piece

// api holds an instance of SupernodeAPI to interact with supernode.
api api.SupernodeAPI
cfg *config.Config
}

// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
pr, pw := io.Pipe()
limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "")
clientWriter := &ClientStreamWriter{
clientQueue: clientQueue,
pipeReader: pr,
pipeWriter: pw,
limitReader: limitReader,
api: api,
cfg: cfg,
cache: make(map[int]*Piece),
}
return clientWriter
}

func (csw *ClientStreamWriter) PreRun(ctx context.Context) (err error) {
csw.p2pPattern = helper.IsP2P(csw.cfg.Pattern)
csw.result = true
csw.finish = make(chan struct{})
return
}

func (csw *ClientStreamWriter) PostRun(ctx context.Context) (err error) {
return nil
}

// Run starts writing pipe.
func (csw *ClientStreamWriter) Run(ctx context.Context) {
for {
item := csw.clientQueue.Poll()
state, ok := item.(string)
if ok && state == last {
break
}
if ok && state == reset {
// stream could not reset, just return error
csw.pipeWriter.CloseWithError(fmt.Errorf("stream writer not support reset"))
continue
}
if !csw.result {
continue
}

piece, ok := item.(*Piece)
if !ok {
continue
}
if err := csw.write(piece); err != nil {
logrus.Errorf("write item:%s error:%v", piece, err)
csw.cfg.BackSourceReason = config.BackSourceReasonWriteError
csw.result = false
}
}

csw.pipeWriter.Close()
close(csw.finish)
}

// Wait for Run whether is finished.
func (csw *ClientStreamWriter) Wait() {
if csw.finish != nil {
<-csw.finish
}
}

func (csw *ClientStreamWriter) write(piece *Piece) error {
startTime := time.Now()
// TODO csw.p2pPattern

err := csw.writePieceToPipe(piece)
if err == nil {
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime))
}
return err
}

func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
for {
// must write piece by order
// when received PieceNum is greater then pieceIndex, cache it
if p.PieceNum != csw.pieceIndex {
if p.PieceNum < csw.pieceIndex {
logrus.Warnf("piece number should be greater than %d, received piece number: %d",
csw.pieceIndex, p.PieceNum)
break
}
csw.cache[p.PieceNum] = p
break
}

_, err := io.Copy(csw.pipeWriter, p.RawContent())
if err != nil {
return err
}

csw.pieceIndex++
// next piece may be already in cache, check it
next, ok := csw.cache[csw.pieceIndex]
if ok {
p = next
delete(csw.cache, csw.pieceIndex)
continue
}
break
}

return nil
}

func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) {
n, err = csw.limitReader.Read(p)
// all data received, calculate md5
if err == io.EOF && csw.cfg.Md5 != "" {
realMd5 := csw.limitReader.Md5()
if realMd5 != csw.cfg.Md5 {
return n, fmt.Errorf("md5 not match, expected: %s real: %s", csw.cfg.Md5, realMd5)
}
}
return n, err
}
Loading

0 comments on commit 790e620

Please sign in to comment.