Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: supernode support download remote file by skipping secure ve…
Browse files Browse the repository at this point in the history
…rify or setting root ca certs.

Signed-off-by: zhouchencheng <zhouchencheng@bilibili.com>
  • Loading branch information
zcc35357949 committed Aug 9, 2019
1 parent e659d7f commit 9dace51
Show file tree
Hide file tree
Showing 21 changed files with 1,645 additions and 1,345 deletions.
2,443 changes: 1,227 additions & 1,216 deletions apis/swagger.yml

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions apis/types/task_register_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/dfget/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func initFlags() {

flagSet.StringVar(&cfg.CallSystem, "callsystem", "",
"The name of dfget caller which is for debugging. Once set, it will be passed to all components around the request to make debugging easy")
flagSet.StringSliceVar(&cfg.Cacerts, "cacerts", nil,
"The cacert file which is used to verify remote server when supernode interact with the source.")
flagSet.StringVarP(&cfg.Pattern, "pattern", "p", "p2p",
"download pattern, must be p2p/cdn/source, cdn and source do not support flag --totallimit")
flagSet.StringVarP(&filter, "filter", "f", "",
Expand All @@ -224,6 +226,8 @@ func initFlags() {
"disable back source downloading for requested file when p2p fails to download it")
flagSet.BoolVar(&cfg.DFDaemon, "dfdaemon", false,
"identify whether the request is from dfdaemon")
flagSet.BoolVar(&cfg.Insecure, "insecure", false,
"identify whether supernode should skip secure verify when interact with the source.")
flagSet.IntVar(&cfg.ClientQueueSize, "clientqueue", config.DefaultClientQueueSize,
"specify the size of client queue which controls the number of pieces that can be processed simultaneously")

Expand Down
27 changes: 16 additions & 11 deletions dfdaemon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,27 @@ func (p *Properties) DFGetConfig() DFGetConfig {
dfgetFlags = append(dfgetFlags, "--verbose")
}

return DFGetConfig{
dfgetConfig := DFGetConfig{
DfgetFlags: dfgetFlags,
SuperNodes: p.SuperNodes,
RateLimit: p.RateLimit,
DFRepo: p.DFRepo,
DFPath: p.DFPath,
}
if p.HijackHTTPS != nil {
dfgetConfig.HostsConfig = p.HijackHTTPS.Hosts
}
return dfgetConfig
}

// DFGetConfig configures how dfdaemon calls dfget
type DFGetConfig struct {
DfgetFlags []string `yaml:"dfget_flags"`
SuperNodes []string `yaml:"supernodes"`
RateLimit string `yaml:"ratelimit"`
DFRepo string `yaml:"localrepo"`
DFPath string `yaml:"dfpath"`
DfgetFlags []string `yaml:"dfget_flags"`
SuperNodes []string `yaml:"supernodes"`
RateLimit string `yaml:"ratelimit"`
DFRepo string `yaml:"localrepo"`
DFPath string `yaml:"dfpath"`
HostsConfig []*HijackHost `yaml:"hosts" json:"hosts"`
}

// RegistryMirror configures the mirror of the official docker registry
Expand Down Expand Up @@ -261,7 +266,7 @@ func (u *URL) MarshalYAML() (interface{}, error) {
// CertPool is a wrapper around x509.CertPool, which can be unmarshalled and
// constructed from a list of filenames
type CertPool struct {
files []string
Files []string
*x509.CertPool
}

Expand All @@ -276,11 +281,11 @@ func (cp *CertPool) UnmarshalJSON(b []byte) error {
}

func (cp *CertPool) unmarshal(unmarshal func(interface{}) error) error {
if err := unmarshal(&cp.files); err != nil {
if err := unmarshal(&cp.Files); err != nil {
return err
}

pool, err := certPoolFromFiles(cp.files...)
pool, err := certPoolFromFiles(cp.Files...)
if err != nil {
return err
}
Expand All @@ -291,12 +296,12 @@ func (cp *CertPool) unmarshal(unmarshal func(interface{}) error) error {

// MarshalJSON implements json.Marshaller to print the cert pool
func (cp *CertPool) MarshalJSON() ([]byte, error) {
return json.Marshal(cp.files)
return json.Marshal(cp.Files)
}

// MarshalYAML implements yaml.Marshaller to print the cert pool
func (cp *CertPool) MarshalYAML() (interface{}, error) {
return cp.files, nil
return cp.Files, nil
}

// Regexp is simple wrapper around regexp.Regexp to make it unmarshallable from a string
Expand Down
17 changes: 15 additions & 2 deletions dfdaemon/downloader/dfget/dfget.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ package dfget

import (
"fmt"
netUrl "net/url"
"os/exec"
"path/filepath"
"strings"
"syscall"
"time"

log "github.com/sirupsen/logrus"

"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/constant"
"github.com/dragonflyoss/Dragonfly/dfdaemon/exception"

log "github.com/sirupsen/logrus"
)

// DFGetter implements Downloader to download file by dragonfly
Expand Down Expand Up @@ -92,5 +93,17 @@ func (dfGetter *DFGetter) getCommand(
}
}

urlInfo, _ := netUrl.Parse(url)
for _, h := range dfGetter.config.HostsConfig {
if urlInfo != nil && h.Regx.MatchString(urlInfo.Host) {
if h.Insecure {
args = append(args, "--insecure")
}
if h.Certs != nil && len(h.Certs.Files) != 0 {
add("--cacerts", strings.Join(h.Certs.Files, ","))
}
}
}

return exec.Command(dfGetter.config.DFPath, args...)
}
6 changes: 6 additions & 0 deletions dfget/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ type Config struct {
// default:`p2p`.
Pattern string `json:"pattern,omitempty"`

// CA certificate to verify when supernode interact with the source.
Cacerts []string `json:"cacert,omitempty"`

// Filter filter some query params of url, use char '&' to separate different params.
// eg: -f 'key&sign' will filter 'key' and 'sign' query param.
// in this way, different urls correspond one same download task that can use p2p mode.
Expand All @@ -185,6 +188,9 @@ type Config struct {
// DFDaemon indicates whether the caller is from dfdaemon
DFDaemon bool `json:"dfdaemon,omitempty"`

// Insecure indicates whether skip secure verify when supernode interact with the source.
Insecure bool `json:"insecure,omitempty"`

// Version show version.
Version bool `json:"version,omitempty"`

Expand Down
12 changes: 12 additions & 0 deletions dfget/core/regist/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package regist

import (
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -139,12 +140,23 @@ func (s *supernodeRegister) constructRegisterRequest(port int) *types.RegisterRe
CallSystem: cfg.CallSystem,
Headers: cfg.Header,
Dfdaemon: cfg.DFDaemon,
Insecure: cfg.Insecure,
}
if cfg.Md5 != "" {
req.Md5 = cfg.Md5
} else if cfg.Identifier != "" {
req.Identifier = cfg.Identifier
}

for _, certPath := range cfg.Cacerts {
caBytes, err := ioutil.ReadFile(certPath)
if err != nil {
logrus.Errorf("read cert file fail:%v", err)
continue
}
req.RootCAs = append(req.RootCAs, caBytes)
}

return req
}

Expand Down
2 changes: 2 additions & 0 deletions dfget/types/register_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type RegisterRequest struct {
CallSystem string `json:"callSystem,omitempty"`
Headers []string `json:"headers,omitempty"`
Dfdaemon bool `json:"dfdaemon,omitempty"`
Insecure bool `json:"insecure,omitempty"`
RootCAs [][]byte `json:"rootCAs,omitempty"`
}

func (r *RegisterRequest) String() string {
Expand Down
63 changes: 0 additions & 63 deletions pkg/httputils/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/pkg/util"

"github.com/pkg/errors"
Expand Down Expand Up @@ -293,67 +291,6 @@ func CheckConnect(ip string, port int, timeout int) (localIP string, e error) {
return
}

// IsExpired checks if a resource received or stored is the same.
func IsExpired(url string, headers map[string]string, lastModified int64, eTag string) (bool, error) {
if lastModified <= 0 && stringutils.IsEmptyStr(eTag) {
return true, nil
}

// set headers
if headers == nil {
headers = make(map[string]string)
}
if lastModified > 0 {
lastModifiedStr, _ := netutils.ConvertTimeIntToString(lastModified)
headers["If-Modified-Since"] = lastModifiedStr
}
if !stringutils.IsEmptyStr(eTag) {
headers["If-None-Match"] = eTag
}

// send request
resp, err := HTTPGetTimeout(url, headers, 4*time.Second)
if err != nil {
return false, err
}
resp.Body.Close()

return resp.StatusCode != http.StatusNotModified, nil
}

// IsSupportRange checks if the source url support partial requests.
func IsSupportRange(url string, headers map[string]string) (bool, error) {
// set headers
if headers == nil {
headers = make(map[string]string)
}
headers["Range"] = "bytes=0-0"

// send request
resp, err := HTTPGetTimeout(url, headers, 4*time.Second)
if err != nil {
return false, err
}
resp.Body.Close()

if resp.StatusCode == http.StatusPartialContent {
return true, nil
}
return false, nil
}

// GetContentLength send a head request to get file length.
func GetContentLength(url string, headers map[string]string) (int64, int, error) {
// send request
resp, err := HTTPGetTimeout(url, headers, 4*time.Second)
if err != nil {
return 0, 0, err
}
resp.Body.Close()

return resp.ContentLength, resp.StatusCode, nil
}

// ConstructRangeStr wrap the rangeStr as a HTTP Range header value.
func ConstructRangeStr(rangeStr string) string {
return fmt.Sprintf("bytes=%s", rangeStr)
Expand Down
10 changes: 6 additions & 4 deletions supernode/daemon/mgr/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/stringutils"
"github.com/dragonflyoss/Dragonfly/supernode/httpclient"
"github.com/dragonflyoss/Dragonfly/supernode/store"

"github.com/sirupsen/logrus"
Expand All @@ -14,12 +14,14 @@ import (
type cacheDetector struct {
cacheStore *store.Store
metaDataManager *fileMetaDataManager
OriginClient httpclient.OriginHTTPClient
}

func newCacheDetector(cacheStore *store.Store, metaDataManager *fileMetaDataManager) *cacheDetector {
func newCacheDetector(cacheStore *store.Store, metaDataManager *fileMetaDataManager, originClient httpclient.OriginHTTPClient) *cacheDetector {
return &cacheDetector{
cacheStore: cacheStore,
metaDataManager: metaDataManager,
OriginClient: originClient,
}
}

Expand Down Expand Up @@ -50,7 +52,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.TaskInfo)
}

func (cd *cacheDetector) parseBreakNum(ctx context.Context, task *types.TaskInfo, metaData *fileMetaData) int {
expired, err := httputils.IsExpired(task.RawURL, task.Headers, metaData.LastModified, metaData.ETag)
expired, err := cd.OriginClient.IsExpired(task.RawURL, task.Headers, metaData.LastModified, metaData.ETag)
if err != nil {
logrus.Errorf("failed to check whether the task(%s) has expired: %v", task.ID, err)
}
Expand All @@ -67,7 +69,7 @@ func (cd *cacheDetector) parseBreakNum(ctx context.Context, task *types.TaskInfo
return 0
}

supportRange, err := httputils.IsSupportRange(task.TaskURL, task.Headers)
supportRange, err := cd.OriginClient.IsSupportRange(task.TaskURL, task.Headers)
if err != nil {
logrus.Errorf("failed to check whether the task(%s) supports partial requests: %v", task.ID, err)
}
Expand Down
16 changes: 1 addition & 15 deletions supernode/daemon/mgr/cdn/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cdn

import (
"context"
"fmt"
"net/http"

errorType "github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand Down Expand Up @@ -36,18 +35,5 @@ func (cm *Manager) download(ctx context.Context, taskID, url string, headers map
}

logrus.Infof("start to download for taskId(%s) with fileUrl: %s header: %v checkCode: %d", taskID, url, headers, checkCode)
return getWithURL(url, headers, checkCode)
}

func getWithURL(url string, headers map[string]string, checkCode int) (*http.Response, error) {
// TODO: add timeout
resp, err := httputils.HTTPGet(url, headers)
if err != nil {
return nil, err
}

if resp.StatusCode == checkCode {
return resp, nil
}
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
return cm.originClient.Download(url, headers, checkCode)
}
Loading

0 comments on commit 9dace51

Please sign in to comment.