Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: add client stream writer for Streaming
Browse files Browse the repository at this point in the history
Signed-off-by: 楚贤 <chuxian.mjj@antfin.com>
  • Loading branch information
jim3ma committed Jan 16, 2020
1 parent e7332a3 commit 3c375d4
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 17 deletions.
1 change: 1 addition & 0 deletions cmd/dfget/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func initFlags() {
"caching duration for which cached file keeps no accessed by any process, after this period cache file will be deleted")
flagSet.DurationVar(&cfg.RV.ServerAliveTime, "alivetime", config.ServerAliveTime,
"alive duration for which uploader keeps no accessing by any uploading requests, after this period uploader will automatically exit")
flagSet.BoolVar(&cfg.RV.StreamMode, "stream", false, "enable stream mode")

flagSet.MarkDeprecated("exceed", "please use '--timeout' or '-e' instead")
}
Expand Down
6 changes: 6 additions & 0 deletions dfget/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ type RuntimeVariable struct {
// RealTarget specifies the full target path whose value is equal to the `Output`.
RealTarget string

// StreamMode specifies that all pieces will be wrote to a Pipe, currently only support cdn mode.
// when StreamMode is true, all data will write directly.
// the mode is prepared for this issue https://github.com/dragonflyoss/Dragonfly/issues/1164
// TODO: support p2p mode
StreamMode bool

// TargetDir is the directory of the RealTarget path.
TargetDir string

Expand Down
185 changes: 185 additions & 0 deletions dfget/core/downloader/p2p_downloader/client_stream_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package downloader

import (
"context"
"fmt"
"io"
"time"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/dfget/core/api"
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
)

// ClientWriter writes a file for uploading and a target file.
type ClientStreamWriter struct {
// clientQueue maintains a queue of tasks that need to be written to disk.
// The downloader will put the piece into this queue after it downloaded a piece successfully.
// And clientWriter will poll values from this queue constantly and write to disk.
clientQueue queue.Queue
// finish indicates whether the task written is completed.
finish chan struct{}

syncQueue queue.Queue
// pieceIndex records the number of pieces currently downloaded.
pieceIndex int
// result records whether the write operation was successful.
result bool

// p2pPattern records whether the pattern equals "p2p".
p2pPattern bool

// pipeWriter is the writer half of a pipe, all piece data will be wrote into pipeWriter
pipeWriter *io.PipeWriter

// pipeReader is the read half of a pipe
pipeReader *io.PipeReader

cache map[int]*Piece

// api holds an instance of SupernodeAPI to interact with supernode.
api api.SupernodeAPI
cfg *config.Config
}

// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) (*ClientStreamWriter, error) {
pr, pw := io.Pipe()
clientWriter := &ClientStreamWriter{
clientQueue: clientQueue,
pipeReader: pr,
pipeWriter: pw,
api: api,
cfg: cfg,
}
if err := clientWriter.init(); err != nil {
return nil, err
}
return clientWriter, nil
}

func (csw *ClientStreamWriter) init() (err error) {
csw.p2pPattern = helper.IsP2P(csw.cfg.Pattern)
csw.result = true
csw.finish = make(chan struct{})
return
}

// Run starts writing pipe.
func (csw *ClientStreamWriter) Run(ctx context.Context) {
for {
item := csw.clientQueue.Poll()
state, ok := item.(string)
if ok && state == last {
break
}
if ok && state == reset {
// stream could not reset, just return error
//
csw.pipeWriter.CloseWithError(fmt.Errorf("stream writer not support reset"))
continue
}
if !csw.result {
continue
}

piece, ok := item.(*Piece)
if !ok {
continue
}
if err := csw.write(piece); err != nil {
logrus.Errorf("write item:%s error:%v", piece, err)
csw.cfg.BackSourceReason = config.BackSourceReasonWriteError
csw.result = false
}
}

csw.pipeWriter.Close()
close(csw.finish)
}

// Wait for Run whether is finished.
func (csw *ClientStreamWriter) Wait() {
if csw.finish != nil {
<-csw.finish
}
}

func (csw *ClientStreamWriter) write(piece *Piece) error {
startTime := time.Now()
// TODO csw.p2pPattern

err := csw.writePieceToPipe(piece)
if err == nil {
go csw.sendSuccessPiece(piece, time.Since(startTime))
}
return err
}

func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
for {
// must write piece by order
// when received PieceNum is great then pieceIndex, cache it
if p.PieceNum != csw.pieceIndex {
if p.PieceNum < csw.pieceIndex {
return fmt.Errorf("piece number should great than %d", csw.pieceIndex)
}
csw.cache[p.PieceNum] = p
break
}

_, err := io.Copy(csw.pipeWriter, p.RawContent())
if err != nil {
return err
}

csw.pieceIndex++
// next piece may be already in cache, check it
next, ok := csw.cache[csw.pieceIndex]
if ok {
p = next
continue
}
break
}

return nil
}

func (csw *ClientStreamWriter) sendSuccessPiece(piece *Piece, cost time.Duration) {
csw.api.ReportPiece(piece.SuperNode, &types.ReportPieceRequest{
TaskID: piece.TaskID,
Cid: csw.cfg.RV.Cid,
DstCid: piece.DstCid,
PieceRange: piece.Range,
})
if cost.Seconds() > 2.0 {
logrus.Infof(
"async writer and report suc from dst:%s... cost:%.3f for range:%s",
piece.DstCid[:25], cost.Seconds(), piece.Range)
}
}

func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) {
return csw.pipeReader.Read(p)
}
9 changes: 8 additions & 1 deletion dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ import (
"github.com/sirupsen/logrus"
)

// RunWaiter will be used in p2p downloader
//
type RunWaiter interface {
Run(ctx context.Context)
Wait()
}

// ClientWriter writes a file for uploading and a target file.
type ClientWriter struct {
// clientQueue maintains a queue of tasks that need to be written to disk.
Expand Down Expand Up @@ -75,7 +82,7 @@ type ClientWriter struct {

// NewClientWriter creates and initialize a ClientWriter instance.
func NewClientWriter(clientFilePath, serviceFilePath string,
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) (*ClientWriter, error) {
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) (RunWaiter, error) {
clientWriter := &ClientWriter{
clientQueue: clientQueue,
clientFilePath: clientFilePath,
Expand Down
61 changes: 45 additions & 16 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"os"
"strconv"
Expand Down Expand Up @@ -86,6 +87,10 @@ type P2PDownloader struct {
// always ends with ".service".
serviceFilePath string

// streamMode indicates send piece data into a pipe
// this is useful for use dfget as a library
streamMode bool

// pieceSet range -> bool
// true: if the range is processed successfully
// false: if the range is in processing
Expand Down Expand Up @@ -129,6 +134,7 @@ func (p2p *P2PDownloader) init() {
p2p.node = p2p.RegisterResult.Node
p2p.taskID = p2p.RegisterResult.TaskID
p2p.targetFile = p2p.cfg.RV.RealTarget
p2p.streamMode = p2p.cfg.RV.StreamMode
p2p.taskFileName = p2p.cfg.RV.TaskFileName

p2p.pieceSizeHistory[0], p2p.pieceSizeHistory[1] =
Expand All @@ -153,15 +159,36 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
var (
lastItem *Piece
goNext bool
err error
)

// start ClientWriter
clientWriter, err := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg)
// start RunWaiter
var rw RunWaiter

if p2p.streamMode {
// TODO switching to stream mode is first step to enhance dfget as a library
// hard code here will be changed later
rw, err = NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg)
go func() {
file, err := fileutils.OpenFile(p2p.clientFilePath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
if err != nil {
return
}
defer file.Close()
_, err = io.Copy(file, rw.(io.Reader))
if err != nil {
return
}
}()
} else {
rw, err = NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg)
}

if err != nil {
return err
}
go func() {
clientWriter.Run(ctx)
rw.Run(ctx)
}()

for {
Expand All @@ -186,7 +213,7 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
if code == constants.CodePeerContinue {
p2p.processPiece(response, &curItem)
} else if code == constants.CodePeerFinish {
p2p.finishTask(response, clientWriter)
p2p.finishTask(response, rw)
return nil
} else {
logrus.Warnf("request piece result:%v", response)
Expand Down Expand Up @@ -425,12 +452,12 @@ func (p2p *P2PDownloader) processPiece(response *types.PullPieceTaskResponse,
}
}

func (p2p *P2PDownloader) finishTask(response *types.PullPieceTaskResponse, clientWriter *ClientWriter) {
func (p2p *P2PDownloader) finishTask(response *types.PullPieceTaskResponse, rw RunWaiter) {
// wait client writer finished
logrus.Infof("remaining piece to be written count:%d", p2p.clientQueue.Len())
p2p.clientQueue.Put(last)
waitStart := time.Now()
clientWriter.Wait()
rw.Wait()
logrus.Infof("wait client writer finish cost:%.3f,main qu size:%d,client qu size:%d",
time.Since(waitStart).Seconds(), p2p.queue.Len(), p2p.clientQueue.Len())

Expand All @@ -439,18 +466,20 @@ func (p2p *P2PDownloader) finishTask(response *types.PullPieceTaskResponse, clie
}

// get the temp path where the downloaded file exists.
var src string
if clientWriter.acrossWrite || !helper.IsP2P(p2p.cfg.Pattern) {
src = p2p.cfg.RV.TempTarget
} else {
if _, err := os.Stat(p2p.clientFilePath); err != nil {
logrus.Warnf("client file path:%s not found", p2p.clientFilePath)
if e := fileutils.Link(p2p.serviceFilePath, p2p.clientFilePath); e != nil {
logrus.Warnln("hard link failed, instead of use copy")
fileutils.CopyFile(p2p.serviceFilePath, p2p.clientFilePath)
src := p2p.clientFilePath
// TODO need optimise for stream mode
if cw, ok := rw.(*ClientWriter); ok {
if cw.acrossWrite || !helper.IsP2P(p2p.cfg.Pattern) {
src = p2p.cfg.RV.TempTarget
} else {
if _, err := os.Stat(p2p.clientFilePath); err != nil {
logrus.Warnf("client file path:%s not found", p2p.clientFilePath)
if e := fileutils.Link(p2p.serviceFilePath, p2p.clientFilePath); e != nil {
logrus.Warnln("hard link failed, instead of use copy")
fileutils.CopyFile(p2p.serviceFilePath, p2p.clientFilePath)
}
}
}
src = p2p.clientFilePath
}

// move file to the target file path.
Expand Down

0 comments on commit 3c375d4

Please sign in to comment.