Skip to content

Commit

Permalink
feat: add task manager (#490)
Browse files Browse the repository at this point in the history
* feat: add task manager

Signed-off-by: Gaius <gaius.qi@gmail.com>

Co-authored-by: zzy987 <67889264+zzy987@users.noreply.github.com>
  • Loading branch information
gaius-qi and zzy987 committed Aug 3, 2021
1 parent ea0ae4f commit 86620ca
Show file tree
Hide file tree
Showing 32 changed files with 2,101 additions and 213 deletions.
39 changes: 5 additions & 34 deletions client/daemon/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/util/net/httputils"
)

var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
Expand Down Expand Up @@ -167,13 +168,13 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
}

// Pick header's parameters
filter := pickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter)
tag := pickHeader(req.Header, config.HeaderDragonflyBiz, rt.defaultBiz)
filter := httputils.PickHeader(req.Header, config.HeaderDragonflyFilter, rt.defaultFilter)
tag := httputils.PickHeader(req.Header, config.HeaderDragonflyBiz, rt.defaultBiz)

// Delete hop-by-hop headers
delHopHeaders(req.Header)

meta.Header = headerToMap(req.Header)
meta.Header = httputils.HeaderToMap(req.Header)
meta.Tag = tag
meta.Filter = filter

Expand All @@ -198,7 +199,7 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
return nil, err
}

hdr := mapToHeader(attr)
hdr := httputils.MapToHeader(attr)
log.Infof("download stream attribute: %v", hdr)

resp := &http.Response{
Expand Down Expand Up @@ -245,39 +246,9 @@ var hopHeaders = []string{
"Upgrade",
}

// headerToMap coverts request headers to map[string]string.
func headerToMap(header http.Header) map[string]string {
m := make(map[string]string)
for k, v := range header {
// TODO only use first value currently
m[k] = v[0]
}
return m
}

// mapToHeader coverts map[string]string to request headers.
func mapToHeader(m map[string]string) http.Header {
var h = http.Header{}
for k, v := range m {
h.Set(k, v)
}
return h
}

// delHopHeaders delete hop-by-hop headers.
func delHopHeaders(header http.Header) {
for _, h := range hopHeaders {
header.Del(h)
}
}

// pickHeader pick header with key.
func pickHeader(header http.Header, key, defaultValue string) string {
v := header.Get(key)
if v != "" {
header.Del(key)
return v
}

return defaultValue
}
133 changes: 0 additions & 133 deletions client/daemon/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,136 +72,3 @@ func TestTransport_RoundTrip(t *testing.T) {
}
assert.Equal(testData, output)
}

func TestTransport_headerToMap(t *testing.T) {
tests := []struct {
name string
header http.Header
expect func(t *testing.T, data interface{})
}{
{
name: "normal conversion",
header: http.Header{
"foo": {"foo"},
"bar": {"bar"},
},
expect: func(t *testing.T, data interface{}) {
assert := testifyassert.New(t)
assert.EqualValues(data, map[string]string{
"foo": "foo",
"bar": "bar",
})
},
},
{
name: "header is empty",
header: http.Header{},
expect: func(t *testing.T, data interface{}) {
assert := testifyassert.New(t)
assert.EqualValues(data, map[string]string{})
},
},
{
name: "header is a nested array",
header: http.Header{
"foo": {"foo1", "foo2"},
"bar": {"bar"},
},
expect: func(t *testing.T, data interface{}) {
assert := testifyassert.New(t)
assert.EqualValues(data, map[string]string{
"foo": "foo1",
"bar": "bar",
})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
data := headerToMap(tc.header)
tc.expect(t, data)
})
}
}

func TestTransport_mapToHeader(t *testing.T) {
tests := []struct {
name string
m map[string]string
expect func(t *testing.T, data interface{})
}{
{
name: "normal conversion",
m: map[string]string{
"Foo": "foo",
"Bar": "bar",
},
expect: func(t *testing.T, data interface{}) {
assert := testifyassert.New(t)
assert.EqualValues(data, http.Header{
"Foo": {"foo"},
"Bar": {"bar"},
})
},
},
{
name: "map is empty",
m: map[string]string{},
expect: func(t *testing.T, data interface{}) {
assert := testifyassert.New(t)
assert.EqualValues(data, http.Header{})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
data := mapToHeader(tc.m)
tc.expect(t, data)
})
}
}

func TestTransport_pickHeader(t *testing.T) {
tests := []struct {
name string
header http.Header
key string
defaultValue string
expect func(t *testing.T, data string, header http.Header)
}{
{
name: "Pick the existing key",
header: http.Header{
"Foo": {"foo"},
"Bar": {"bar"},
},
key: "Foo",
defaultValue: "",
expect: func(t *testing.T, data string, header http.Header) {
assert := testifyassert.New(t)
assert.Equal("foo", data)
assert.Equal("", header.Get("Foo"))
},
},
{
name: "Pick the non-existent key",
header: http.Header{},
key: "Foo",
defaultValue: "bar",
expect: func(t *testing.T, data string, header http.Header) {
assert := testifyassert.New(t)
assert.Equal(data, "bar")
assert.Equal(header.Get("Foo"), "")
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
data := pickHeader(tc.header, tc.key, tc.defaultValue)
tc.expect(t, data, tc.header)
})
}
}
5 changes: 5 additions & 0 deletions cmd/scheduler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ for deciding which peers transmit blocks to each other.`,
return err
}

// Convert redis host config
if err := cfg.Convert(); err != nil {
return err
}

return runScheduler()
},
}
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/HuKeping/rbtree v0.0.0-20210106022122-8ad34838eb2b
github.com/RichardKnop/machinery v1.10.6
github.com/VividCortex/mysqlerr v1.0.0
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/aliyun/aliyun-oss-go-sdk v2.1.6+incompatible
Expand All @@ -12,20 +13,22 @@ require (
github.com/casbin/casbin/v2 v2.34.1
github.com/casbin/gorm-adapter/v3 v3.3.2
github.com/colinmarc/hdfs/v2 v2.2.0
github.com/docker/distribution v2.7.1+incompatible
github.com/docker/go-units v0.4.0
github.com/emirpasic/gods v1.12.0
github.com/envoyproxy/protoc-gen-validate v0.6.1
github.com/gin-gonic/gin v1.7.2
github.com/go-echarts/statsview v0.3.4
github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a
github.com/go-playground/validator/v10 v10.4.1
github.com/go-redis/cache/v8 v8.4.1
github.com/go-redis/redis/v8 v8.10.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gofrs/flock v0.8.0
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.1.5
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.7.3
github.com/jarcoal/httpmock v1.0.8
github.com/klauspost/compress v1.13.1 // indirect
Expand All @@ -38,13 +41,14 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pborman/uuid v1.2.1
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/schollz/progressbar/v3 v3.8.2
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/sirupsen/logrus v1.4.2
Expand All @@ -64,7 +68,6 @@ require (
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/tools v0.1.4 // indirect
gonum.org/v1/gonum v0.9.3
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a // indirect
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.26.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand All @@ -73,7 +76,6 @@ require (
gorm.io/datatypes v1.0.1
gorm.io/driver/mysql v1.0.5
gorm.io/gorm v1.21.9
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.20.6 // indirect
k8s.io/client-go v11.0.0+incompatible
)

0 comments on commit 86620ca

Please sign in to comment.