Skip to content

Commit

Permalink
feat: support delayed download of files under feed-server control
Browse files Browse the repository at this point in the history
  • Loading branch information
fireyun committed Jun 27, 2024
1 parent e413e24 commit 130d8d5
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 1 deletion.
119 changes: 119 additions & 0 deletions examples/download-file-bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

// pull file example for bscp sdk
package main

import (
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/TencentBlueKing/bscp-go/internal/downloader"
"golang.org/x/exp/slog"

"github.com/TencentBlueKing/bscp-go/client"
"github.com/TencentBlueKing/bscp-go/internal/constant"
"github.com/TencentBlueKing/bscp-go/pkg/logger"
)

func main() {
// 设置日志等级为debug
level := logger.GetLevelByName("debug")
logger.SetLevel(level)

// 设置日志自定义 Handler
// logger.SetHandler(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{}))

// 在线服务, 可设置 metrics
// metrics.RegisterMetrics()
// http.Handle("/metrics", promhttp.Handler())

// 初始化配置信息, 按需修改
bizStr := os.Getenv("BSCP_BIZ")
biz, err := strconv.ParseInt(bizStr, 10, 64)
if err != nil {
slog.Error("parse BSCP_BIZ", logger.ErrAttr(err))
os.Exit(1)
}

clientOpts := []client.Option{
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
}
if os.Getenv("BSCP_ENABLE_FILE_CACHE") != "" {
clientOpts = append(clientOpts, client.WithFileCache(client.FileCache{
Enabled: true,
CacheDir: constant.DefaultFileCacheDir,
ThresholdGB: constant.DefaultCacheThresholdGB,
}))
}

bscp, err := client.New(clientOpts...)
if err != nil {
slog.Error("init client", logger.ErrAttr(err))
os.Exit(1)
}

appName := os.Getenv("BSCP_APP")
opts := []client.AppOption{}
release, err := bscp.PullFiles(appName, opts...)
if err != nil {
slog.Error("pull app files failed", logger.ErrAttr(err))
os.Exit(1)
}

start := time.Now()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err = downloadAppFiles(release); err != nil {
slog.Error("download app files failed", logger.ErrAttr(err))
}
}()
}
wg.Wait()

costTime := time.Since(start).Seconds()
slog.Info("download app files finished", slog.Int64("success", success), slog.Int64("success", success),
slog.Float64("cost_time_seconds", costTime))

// 持续阻塞,便于观察对比客户端进程负载状况
select {}
}

var success, fail int64

// downloadAppFiles 下载服务文件
func downloadAppFiles(release *client.Release) error {
for _, c := range release.FileItems {
bytes := make([]byte, c.FileMeta.ContentSpec.ByteSize)

if err := downloader.GetDownloader().Download(c.FileMeta.PbFileMeta(), c.FileMeta.RepositoryPath,
c.FileMeta.ContentSpec.ByteSize, downloader.DownloadToBytes, bytes, ""); err != nil {
atomic.AddInt64(&fail, 1)
return err
}
atomic.AddInt64(&success, 1)
logger.Debug("get file content by downloading from repo success", slog.String("file", path.Join(c.Path, c.Name)))
}

return nil
}
17 changes: 16 additions & 1 deletion internal/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type execDownload struct {
header http.Header
downloadUri string
fileSize uint64
waitTimeMil int64
}

func (exec *execDownload) do() error {
Expand Down Expand Up @@ -257,6 +258,7 @@ func (exec *execDownload) do() error {
Err: fmt.Errorf("get temporary download url failed, err: %s", err.Error())})
}
exec.downloadUri = resp.Url
exec.waitTimeMil = resp.WaitTimeMil

Check failure on line 261 in internal/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / lint

resp.WaitTimeMil undefined (type *pbfs.GetDownloadURLResp has no field or method WaitTimeMil) (typecheck)

Check failure on line 261 in internal/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / lint

resp.WaitTimeMil undefined (type *pbfs.GetDownloadURLResp has no field or method WaitTimeMil)) (typecheck)

Check failure on line 261 in internal/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / lint

resp.WaitTimeMil undefined (type *pbfs.GetDownloadURLResp has no field or method WaitTimeMil)) (typecheck)

Check failure on line 261 in internal/downloader/downloader.go

View workflow job for this annotation

GitHub Actions / build

resp.WaitTimeMil undefined (type *pbfs.GetDownloadURLResp has no field or method WaitTimeMil)
if exec.fileSize <= exec.dl.balanceDownloadByteSize {
// the file size is not big enough, download directly
if e := exec.downloadDirectlyWithRetry(); e != nil {
Expand Down Expand Up @@ -355,6 +357,14 @@ func (exec *execDownload) isProviderSupportRangeDownload() (uint64, bool, error)

// downloadDirectlyWithRetry download file directly with retry
func (exec *execDownload) downloadDirectlyWithRetry() error {
logger.Debug("start download file directly",
slog.String("file", path.Join(exec.fileMeta.ConfigItemSpec.Path, exec.fileMeta.ConfigItemSpec.Name)),
slog.Int64("waitTimeMil", exec.waitTimeMil))
// wait before downloading, used for traffic control, avoid file storage service overload
if exec.waitTimeMil > 0 {
time.Sleep(time.Millisecond * time.Duration(exec.waitTimeMil))
}

// do download with retry
retry := tools.NewRetryPolicy(1, [2]uint{500, 10000})
maxRetryCount := 5
Expand Down Expand Up @@ -402,7 +412,12 @@ func (exec *execDownload) downloadDirectly(timeoutSeconds int) error {

func (exec *execDownload) downloadWithRange() error {
logger.Info("start download file with range",
slog.String("file", path.Join(exec.fileMeta.ConfigItemSpec.Path, exec.fileMeta.ConfigItemSpec.Name)))
slog.String("file", path.Join(exec.fileMeta.ConfigItemSpec.Path, exec.fileMeta.ConfigItemSpec.Name)),
slog.Int64("waitTimeMil", exec.waitTimeMil))
// wait before downloading, used for traffic control, avoid file storage service overload
if exec.waitTimeMil > 0 {
time.Sleep(time.Millisecond * time.Duration(exec.waitTimeMil))
}

var start, end uint64
batchSize := 2 * exec.dl.balanceDownloadByteSize
Expand Down

0 comments on commit 130d8d5

Please sign in to comment.