Skip to content

Commit

Permalink
refactor: resource host without scheduler v1 definition (#2036)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Feb 6, 2023
1 parent 5742226 commit fe7b7fd
Show file tree
Hide file tree
Showing 16 changed files with 1,431 additions and 674 deletions.
263 changes: 237 additions & 26 deletions scheduler/resource/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (

"go.uber.org/atomic"

schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
Expand All @@ -41,6 +39,87 @@ func WithConcurrentUploadLimit(limit int32) HostOption {
}
}

// WithOS sets host's os.
func WithOS(os string) HostOption {
return func(h *Host) *Host {
h.OS = os
return h
}
}

// WithPlatform sets host's platform.
func WithPlatform(platform string) HostOption {
return func(h *Host) *Host {
h.Platform = platform
return h
}
}

// WithPlatformFamily sets host's platform family.
func WithPlatformFamily(platformFamily string) HostOption {
return func(h *Host) *Host {
h.PlatformFamily = platformFamily
return h
}
}

// WithPlatformVersion sets host's platform version.
func WithPlatformVersion(platformVersion string) HostOption {
return func(h *Host) *Host {
h.PlatformVersion = platformVersion
return h
}
}

// WithKernelVersion sets host's kernel version.
func WithKernelVersion(kernelVersion string) HostOption {
return func(h *Host) *Host {
h.KernelVersion = kernelVersion
return h
}
}

// WithCPU sets host's cpu.
func WithCPU(cpu CPU) HostOption {
return func(h *Host) *Host {
h.CPU = cpu
return h
}
}

// WithMemory sets host's memory.
func WithMemory(memory Memory) HostOption {
return func(h *Host) *Host {
h.Memory = memory
return h
}
}

// WithNetwork sets host's network.
func WithNetwork(network Network) HostOption {
return func(h *Host) *Host {
h.Network = network
return h
}
}

// WithDisk sets host's disk.
func WithDisk(disk Disk) HostOption {
return func(h *Host) *Host {
h.Disk = disk
return h
}
}

// WithBuild sets host's build information.
func WithBuild(build Build) HostOption {
return func(h *Host) *Host {
h.Build = build
return h
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
ID string
Expand Down Expand Up @@ -76,19 +155,19 @@ type Host struct {
KernelVersion string

// CPU Stat.
CPU *schedulerv1.CPU
CPU CPU

// Memory Stat.
Memory *schedulerv1.Memory
Memory Memory

// Network Stat.
Network *schedulerv1.Network
Network Network

// Dist Stat.
Disk *schedulerv1.Disk
Disk Disk

// Build information.
Build *schedulerv1.Build
Build Build

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit *atomic.Int32
Expand Down Expand Up @@ -118,34 +197,166 @@ type Host struct {
Log *logger.SugaredLoggerOnWith
}

// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`

// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`

// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`

// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`

// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
}

// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`

// CPU time of system.
System float64 `csv:"system"`

// CPU time of idle.
Idle float64 `csv:"idle"`

// CPU time of nice.
Nice float64 `csv:"nice"`

// CPU time of iowait.
Iowait float64 `csv:"iowait"`

// CPU time of irq.
Irq float64 `csv:"irq"`

// CPU time of softirq.
Softirq float64 `csv:"softirq"`

// CPU time of steal.
Steal float64 `csv:"steal"`

// CPU time of guest.
Guest float64 `csv:"guest"`

// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
}

// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`

// RAM available for programs to allocate.
Available uint64 `csv:"available"`

// RAM used by programs.
Used uint64 `csv:"used"`

// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`

// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`

// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
}

// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`

// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`

// Security domain for network.
SecurityDomain string `csv:"securityDomain"`

// Location path(area|country|province|city|...).
Location string `csv:"location"`

// IDC where the peer host is located
IDC string `csv:"idc"`
}

// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`

// Git commit.
GitCommit string `csv:"gitCommit"`

// Golang version.
GoVersion string `csv:"goVersion"`

// Build platform.
Platform string `csv:"platform"`
}

// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`

// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`

// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`

// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`

// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`

// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`

// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`

// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
}

// New host instance.
func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host {
func NewHost(
id, ip, hostname string,
port, downloadPort int32, hostType types.HostType,
options ...HostOption,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if hostType == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}

h := &Host{
ID: req.Id,
Type: types.ParseHostType(req.Type),
IP: req.Ip,
Hostname: req.Hostname,
Port: req.Port,
DownloadPort: req.DownloadPort,
OS: req.Os,
Platform: req.Platform,
PlatformFamily: req.PlatformFamily,
PlatformVersion: req.PlatformVersion,
KernelVersion: req.KernelVersion,
CPU: req.Cpu,
Memory: req.Memory,
Network: req.Network,
Disk: req.Disk,
Build: req.Build,
ConcurrentUploadLimit: atomic.NewInt32(config.DefaultPeerConcurrentUploadLimit),
ID: id,
Type: types.HostType(hostType),
IP: ip,
Hostname: hostname,
Port: port,
DownloadPort: downloadPort,
ConcurrentUploadLimit: atomic.NewInt32(int32(concurrentUploadLimit)),
ConcurrentUploadCount: atomic.NewInt32(0),
UploadCount: atomic.NewInt64(0),
UploadFailedCount: atomic.NewInt64(0),
Peers: &sync.Map{},
PeerCount: atomic.NewInt32(0),
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithHost(req.Id, req.Hostname, req.Ip),
Log: logger.WithHost(id, hostname, ip),
}

for _, opt := range options {
Expand Down
24 changes: 18 additions & 6 deletions scheduler/resource/host_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func TestHostManager_Load(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())

mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -184,7 +186,9 @@ func TestHostManager_Store(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())

mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -235,7 +239,9 @@ func TestHostManager_LoadOrStore(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())

mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -288,7 +294,9 @@ func TestHostManager_Delete(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())

mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -363,7 +371,9 @@ func TestHostManager_RunGC(t *testing.T) {
},
expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) {
assert := assert.New(t)
mockSeedHost := NewHost(mockRawSeedHost)
mockSeedHost := NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
hostManager.Store(mockSeedHost)
err := hostManager.RunGC()
assert.NoError(err)
Expand All @@ -382,7 +392,9 @@ func TestHostManager_RunGC(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())

mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)
Expand Down

0 comments on commit fe7b7fd

Please sign in to comment.