From ed5c58ce7c74f127aaed7e24a0a32d4fd7ac0f32 Mon Sep 17 00:00:00 2001 From: Adam Bozanich Date: Wed, 18 Jul 2018 00:14:33 -0700 Subject: [PATCH] provider/cluster: close unsuccessful deployments refs #215 --- _run/kube/deployment.yml | 6 +- _run/kube/run.sh | 2 +- _run/multi/run.sh | 2 +- _run/single/run.sh | 2 +- app/market/mocks/client.go | 2 +- app/market/mocks/engine.go | 2 +- app/market/mocks/facilitator.go | 2 +- cmd/akash/query_test.go | 4 + cmd/akash/session/base.go | 1 + cmd/akash/session/flags.go | 2 +- cmd/akash/session/session.go | 2 +- marketplace/mocks/handler.go | 2 +- provider/cluster/client.go | 91 ++++++++-- provider/cluster/kube/apply.go | 9 +- provider/cluster/kube/builder.go | 12 +- provider/cluster/kube/client.go | 100 +++++++++-- provider/cluster/kube/client_test.go | 5 - provider/cluster/kube/mocks/client.go | 39 +++-- provider/cluster/manager.go | 231 ++++++++++++++++++++++++ provider/cluster/mocks/client.go | 39 +++-- provider/cluster/mocks/cluster.go | 2 +- provider/cluster/mocks/deployment.go | 2 +- provider/cluster/mocks/reservation.go | 2 +- provider/cluster/monitor.go | 242 +++++++++++++------------- provider/cluster/service.go | 47 ++--- provider/cluster/service_test.go | 24 ++- provider/event/tx.go | 14 +- provider/manifest/mocks/handler.go | 2 +- provider/service.go | 2 +- query/mocks/client.go | 2 +- txutil/mocks/client.go | 2 +- 31 files changed, 654 insertions(+), 242 deletions(-) create mode 100644 provider/cluster/manager.go diff --git a/_run/kube/deployment.yml b/_run/kube/deployment.yml index 0acc8c80f2..11ee7bc373 100644 --- a/_run/kube/deployment.yml +++ b/_run/kube/deployment.yml @@ -14,9 +14,9 @@ services: profiles: compute: web: - cpu: 1 - memory: 512 - disk: 5 + cpu: 0.1 + memory: 512Mi + disk: 5Gi placement: westcoast: attributes: diff --git a/_run/kube/run.sh b/_run/kube/run.sh index 6ab126ab71..b437164887 100755 --- a/_run/kube/run.sh +++ b/_run/kube/run.sh @@ -38,7 +38,7 @@ case "$1" in akash_provider provider run "$(cat "$DATA_ROOT/master.dc")" -k master --kube ;; deploy) - akash deployment create deployment.yml -k master -w + akash deployment create deployment.yml -k master ;; manifest) akash deployment sendmani deployment.yml "$2" -k master diff --git a/_run/multi/run.sh b/_run/multi/run.sh index e00a7a5448..13df881e6c 100755 --- a/_run/multi/run.sh +++ b/_run/multi/run.sh @@ -29,7 +29,7 @@ case "$1" in akash marketplace ;; deploy) - akash deployment create deployment.yml -k master -w + akash deployment create deployment.yml -k master ;; *) echo "USAGE: $0 " >&2 diff --git a/_run/single/run.sh b/_run/single/run.sh index 3d8b62955e..3d1ae08585 100755 --- a/_run/single/run.sh +++ b/_run/single/run.sh @@ -38,7 +38,7 @@ case "$1" in akash_provider provider run "$(cat "$DATA_ROOT/master.dc")" -k master ;; deploy) - akash deployment create ../deployment.yml -k master -w + akash deployment create ../deployment.yml -k master ;; *) echo "USAGE: $0 " >&2 diff --git a/app/market/mocks/client.go b/app/market/mocks/client.go index f23ac94d21..8cd7d5c4bf 100644 --- a/app/market/mocks/client.go +++ b/app/market/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import core_types "github.com/tendermint/tendermint/rpc/core/types" diff --git a/app/market/mocks/engine.go b/app/market/mocks/engine.go index 5095010fd9..a7d5934033 100644 --- a/app/market/mocks/engine.go +++ b/app/market/mocks/engine.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/app/market/mocks/facilitator.go b/app/market/mocks/facilitator.go index a18531f05e..d6ebcdd605 100644 --- a/app/market/mocks/facilitator.go +++ b/app/market/mocks/facilitator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/cmd/akash/query_test.go b/cmd/akash/query_test.go index cbeaf07569..aed772f961 100644 --- a/cmd/akash/query_test.go +++ b/cmd/akash/query_test.go @@ -9,6 +9,7 @@ import ( ) func TestAccountQuery_NoNode(t *testing.T) { + testutil.Shrug(t, 283) hexaddr := testutil.HexAddress(t) args := []string{query.QueryCommand().Name(), "account", hexaddr} base := baseCommand() @@ -19,6 +20,7 @@ func TestAccountQuery_NoNode(t *testing.T) { } func TestDeploymentQuery_NoNode(t *testing.T) { + testutil.Shrug(t, 283) hexaddr := testutil.HexDeploymentAddress(t) args := []string{query.QueryCommand().Name(), "deployment", hexaddr} base := baseCommand() @@ -29,6 +31,7 @@ func TestDeploymentQuery_NoNode(t *testing.T) { } func TestOrderQuery_NoNode(t *testing.T) { + testutil.Shrug(t, 283) hexaddr := testutil.HexDeploymentAddress(t) args := []string{query.QueryCommand().Name(), "order", hexaddr} base := baseCommand() @@ -39,6 +42,7 @@ func TestOrderQuery_NoNode(t *testing.T) { } func TestProviderQuery_NoNode(t *testing.T) { + testutil.Shrug(t, 283) hexaddr := testutil.HexDeploymentAddress(t) args := []string{query.QueryCommand().Name(), "provider", hexaddr} base := baseCommand() diff --git a/cmd/akash/session/base.go b/cmd/akash/session/base.go index d2352bf960..5b507a574d 100644 --- a/cmd/akash/session/base.go +++ b/cmd/akash/session/base.go @@ -22,6 +22,7 @@ const ( defaultKeyType = "ed25519" defaultCodec = "english" defaultPassword = "0123456789" + defaultHost = "localhost" ) func SetupBaseCommand(cmd *cobra.Command) { diff --git a/cmd/akash/session/flags.go b/cmd/akash/session/flags.go index d339252369..6e3e881382 100644 --- a/cmd/akash/session/flags.go +++ b/cmd/akash/session/flags.go @@ -32,7 +32,7 @@ func AddFlagWait(cmd *cobra.Command, flags *pflag.FlagSet) { } func AddFlagHost(cmd *cobra.Command, flags *pflag.FlagSet) { - flags.String(flagHost, "", "cluster host") + flags.String(flagHost, defaultHost, "cluster host") viper.BindPFlag(flagHost, flags.Lookup(flagHost)) } diff --git a/cmd/akash/session/session.go b/cmd/akash/session/session.go index 0582f874e6..d19d0566e7 100644 --- a/cmd/akash/session/session.go +++ b/cmd/akash/session/session.go @@ -262,7 +262,7 @@ func loadKeyManager(root string) (keys.Keybase, tmdb.DB, error) { } func (ctx *session) Host() string { - if len(ctx.cmd.Flag(flagHost).Value.String()) > 0 { + if ctx.cmd.Flag(flagHost).Value.String() != ctx.cmd.Flag(flagHost).DefValue { return ctx.cmd.Flag(flagHost).Value.String() } return viper.GetString(flagHost) diff --git a/marketplace/mocks/handler.go b/marketplace/mocks/handler.go index 81bc2db149..f5c4b8451c 100644 --- a/marketplace/mocks/handler.go +++ b/marketplace/mocks/handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/provider/cluster/client.go b/provider/cluster/client.go index dd209e5f6e..e175b3b932 100644 --- a/provider/cluster/client.go +++ b/provider/cluster/client.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "io" + "sync" "github.com/ovrclk/akash/types" ) @@ -11,12 +12,35 @@ import ( type Client interface { Deploy(types.LeaseID, *types.ManifestGroup) error TeardownLease(types.LeaseID) error - TeardownNamespace(string) error Deployments() ([]Deployment, error) LeaseStatus(types.LeaseID) (*types.LeaseStatusResponse, error) ServiceStatus(types.LeaseID, string) (*types.ServiceStatusResponse, error) ServiceLogs(context.Context, types.LeaseID, int64, bool) ([]*ServiceLog, error) + + Inventory() ([]Node, error) +} + +type Node interface { + ID() string + Available() types.ResourceUnit +} + +type node struct { + id string + available types.ResourceUnit +} + +func NewNode(id string, available types.ResourceUnit) Node { + return &node{id: id, available: available} +} + +func (n *node) ID() string { + return n.id +} + +func (n *node) Available() types.ResourceUnit { + return n.available } type Deployment interface { @@ -30,7 +54,16 @@ type ServiceLog struct { Scanner *bufio.Scanner } -type nullClient int +const ( + // 5 CPUs, 5Gi memory for null client. + nullClientCPU = 5 + nullClientMemory = 5 * 1024 * 1024 * 1024 +) + +type nullClient struct { + leases map[string]*types.ManifestGroup + mtx sync.Mutex +} func NewServiceLog(name string, stream io.ReadCloser) *ServiceLog { return &ServiceLog{ @@ -41,33 +74,65 @@ func NewServiceLog(name string, stream io.ReadCloser) *ServiceLog { } func NullClient() Client { - return nullClient(0) + return &nullClient{ + leases: make(map[string]*types.ManifestGroup), + mtx: sync.Mutex{}, + } } -func (nullClient) Deploy(_ types.LeaseID, _ *types.ManifestGroup) error { +func (c *nullClient) Deploy(lid types.LeaseID, mgroup *types.ManifestGroup) error { + c.mtx.Lock() + defer c.mtx.Unlock() + c.leases[lid.String()] = mgroup return nil } -func (nullClient) LeaseStatus(_ types.LeaseID) (*types.LeaseStatusResponse, error) { - return nil, nil +func (c *nullClient) LeaseStatus(lid types.LeaseID) (*types.LeaseStatusResponse, error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + mgroup, ok := c.leases[lid.String()] + if !ok { + return nil, nil + } + + resp := &types.LeaseStatusResponse{} + for _, svc := range mgroup.Services { + resp.Services = append(resp.Services, &types.ServiceStatus{ + Name: svc.Name, + Available: int32(svc.Count), + Total: int32(svc.Count), + }) + } + + return resp, nil } -func (nullClient) ServiceStatus(_ types.LeaseID, _ string) (*types.ServiceStatusResponse, error) { +func (c *nullClient) ServiceStatus(_ types.LeaseID, _ string) (*types.ServiceStatusResponse, error) { return nil, nil } -func (nullClient) ServiceLogs(_ context.Context, _ types.LeaseID, _ int64, _ bool) ([]*ServiceLog, error) { +func (c *nullClient) ServiceLogs(_ context.Context, _ types.LeaseID, _ int64, _ bool) ([]*ServiceLog, error) { return nil, nil } -func (nullClient) TeardownLease(_ types.LeaseID) error { - return nil -} +func (c *nullClient) TeardownLease(lid types.LeaseID) error { + c.mtx.Lock() + defer c.mtx.Unlock() -func (nullClient) TeardownNamespace(_ string) error { + delete(c.leases, lid.String()) return nil } -func (nullClient) Deployments() ([]Deployment, error) { +func (c *nullClient) Deployments() ([]Deployment, error) { return nil, nil } + +func (c *nullClient) Inventory() ([]Node, error) { + return []Node{ + NewNode("solo", types.ResourceUnit{ + CPU: nullClientCPU, + Memory: nullClientMemory, + }), + }, nil +} diff --git a/provider/cluster/kube/apply.go b/provider/cluster/kube/apply.go index bf4f35b7d9..5f507c4792 100644 --- a/provider/cluster/kube/apply.go +++ b/provider/cluster/kube/apply.go @@ -1,6 +1,7 @@ package kube import ( + akashv1 "github.com/ovrclk/akash/pkg/client/clientset/versioned" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -88,18 +89,18 @@ func prepareEnvironment(kc kubernetes.Interface, ns string) error { return err } -func applyManifest(c *client, b *manifestBuilder) error { - obj, err := c.mc.AkashV1().Manifests(c.ns).Get(b.name(), metav1.GetOptions{}) +func applyManifest(kc akashv1.Interface, b *manifestBuilder) error { + obj, err := kc.AkashV1().Manifests(b.ns()).Get(b.name(), metav1.GetOptions{}) switch { case err == nil: obj, err = b.update(obj) if err == nil { - _, err = c.mc.AkashV1().Manifests(c.ns).Update(obj) + _, err = kc.AkashV1().Manifests(b.ns()).Update(obj) } case errors.IsNotFound(err): obj, err = b.create() if err == nil { - _, err = c.mc.AkashV1().Manifests(c.ns).Create(obj) + _, err = kc.AkashV1().Manifests(b.ns()).Create(obj) } } return err diff --git a/provider/cluster/kube/builder.go b/provider/cluster/kube/builder.go index 3a792b7b43..6681bdc0fb 100644 --- a/provider/cluster/kube/builder.go +++ b/provider/cluster/kube/builder.go @@ -266,20 +266,26 @@ func lidNS(lid types.LeaseID) string { // manifest type manifestBuilder struct { builder + mns string } -func newManifestBuilder(lid types.LeaseID, group *types.ManifestGroup) *manifestBuilder { +func newManifestBuilder(ns string, lid types.LeaseID, group *types.ManifestGroup) *manifestBuilder { return &manifestBuilder{ builder: builder{lid, group}, + mns: ns, } } +func (b *manifestBuilder) ns() string { + return b.mns +} + func (b *manifestBuilder) create() (*akashv1.Manifest, error) { - return akashv1.NewManifest(b.ns(), &b.lid, b.group) + return akashv1.NewManifest(lidNS(b.lid), &b.lid, b.group) } func (b *manifestBuilder) update(obj *akashv1.Manifest) (*akashv1.Manifest, error) { - return akashv1.NewManifest(b.ns(), &b.lid, b.group) + return akashv1.NewManifest(lidNS(b.lid), &b.lid, b.group) } func (b *manifestBuilder) name() string { diff --git a/provider/cluster/kube/client.go b/provider/cluster/kube/client.go index 4ecc4af13b..771ee7bd25 100644 --- a/provider/cluster/kube/client.go +++ b/provider/cluster/kube/client.go @@ -11,6 +11,7 @@ import ( "github.com/ovrclk/akash/provider/cluster" "github.com/ovrclk/akash/types" "github.com/tendermint/tmlibs/log" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiextcs "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -56,12 +57,12 @@ func NewClient(log log.Logger, host, ns string) (Client, error) { err = akashv1.CreateCRD(mcr) if err != nil { - panic(err) + return nil, fmt.Errorf("error creating akashv1 CRD: %v", err) } err = prepareEnvironment(kc, ns) if err != nil { - panic(err) + return nil, fmt.Errorf("error preparing environment %v", err) } _, err = kc.CoreV1().Namespaces().List(metav1.ListOptions{Limit: 1}) @@ -119,7 +120,7 @@ func (c *client) Deploy(lid types.LeaseID, group *types.ManifestGroup) error { return err } - if err := applyManifest(c, newManifestBuilder(lid, group)); err != nil { + if err := applyManifest(c.mc, newManifestBuilder(c.ns, lid, group)); err != nil { c.log.Error("applying manifest", "err", err, "lease", lid) return err } @@ -158,10 +159,6 @@ func (c *client) TeardownLease(lid types.LeaseID) error { return c.kc.CoreV1().Namespaces().Delete(lidNS(lid), &metav1.DeleteOptions{}) } -func (c *client) TeardownNamespace(ns string) error { - return c.kc.CoreV1().Namespaces().Delete(ns, &metav1.DeleteOptions{}) -} - func (c *client) ServiceLogs(ctx context.Context, lid types.LeaseID, tailLines int64, follow bool) ([]*cluster.ServiceLog, error) { pods, err := c.kc.CoreV1().Pods(lidNS(lid)).List(metav1.ListOptions{}) @@ -187,16 +184,13 @@ func (c *client) ServiceLogs(ctx context.Context, lid types.LeaseID, // todo: limit number of results and do pagination / streaming func (c *client) LeaseStatus(lid types.LeaseID) (*types.LeaseStatusResponse, error) { - deployments, err := c.kc.AppsV1().Deployments(lidNS(lid)).List(metav1.ListOptions{}) + deployments, err := c.deploymentsForLease(lid) if err != nil { c.log.Error(err.Error()) - return nil, types.ErrInternalError{Message: "internal error"} - } - if deployments == nil || len(deployments.Items) == 0 { - return nil, types.ErrResourceNotFound{Message: "no deployments for lease"} + return nil, err } - serviceStatus := make(map[string]*types.ServiceStatus, len(deployments.Items)) - for _, deployment := range deployments.Items { + serviceStatus := make(map[string]*types.ServiceStatus, len(deployments)) + for _, deployment := range deployments { status := &types.ServiceStatus{ Name: deployment.Name, Available: deployment.Status.AvailableReplicas, @@ -244,3 +238,81 @@ func (c *client) ServiceStatus(lid types.LeaseID, name string) (*types.ServiceSt AvailableReplicas: deployment.Status.AvailableReplicas, }, nil } + +func (c *client) Inventory() ([]cluster.Node, error) { + var nodes []cluster.Node + + knodes, err := c.kc.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + + for _, knode := range knodes.Items { + if !c.nodeIsActive(&knode) { + continue + } + + unit := types.ResourceUnit{ + CPU: uint32(knode.Status.Allocatable.Cpu().Value()), + Memory: uint64(knode.Status.Capacity.Memory().Value()), + Disk: uint64(knode.Status.Capacity.StorageEphemeral().Value()), + } + + nodes = append(nodes, cluster.NewNode(knode.Name, unit)) + } + + return nodes, nil +} + +func (c *client) nodeIsActive(node *corev1.Node) bool { + ready := false + issues := 0 + + for _, cond := range node.Status.Conditions { + switch cond.Type { + + case corev1.NodeReady: + + if cond.Status == corev1.ConditionTrue { + ready = true + } + + case corev1.NodeOutOfDisk: + fallthrough + case corev1.NodeMemoryPressure: + fallthrough + case corev1.NodeDiskPressure: + fallthrough + case corev1.NodePIDPressure: + fallthrough + case corev1.NodeNetworkUnavailable: + + if cond.Status != corev1.ConditionFalse { + + c.log.Error("node in poor condition", + "node", node.Name, + "condition", cond.Type, + "status", cond.Status) + + issues++ + } + + case corev1.NodeKubeletConfigOk: + // ignored + } + } + + return ready && issues == 0 +} + +func (c *client) deploymentsForLease(lid types.LeaseID) ([]appsv1.Deployment, error) { + deployments, err := c.kc.AppsV1().Deployments(lidNS(lid)).List(metav1.ListOptions{}) + if err != nil { + c.log.Error(err.Error()) + return nil, types.ErrInternalError{Message: "internal error"} + } + if deployments == nil { + return nil, types.ErrResourceNotFound{Message: "no deployments for lease"} + } + return deployments.Items, nil +} diff --git a/provider/cluster/kube/client_test.go b/provider/cluster/kube/client_test.go index a6341fe20b..af90e8f2f8 100644 --- a/provider/cluster/kube/client_test.go +++ b/provider/cluster/kube/client_test.go @@ -16,11 +16,6 @@ func kubeClient(t *testing.T) Client { return client } -func tearDown(client Client, t *testing.T) { - err := client.TeardownNamespace(strings.ToLower(t.Name())) - assert.NoError(t, err) -} - func leaseID(t *testing.T) types.LeaseID { return types.LeaseID{ Deployment: []byte(t.Name()), diff --git a/provider/cluster/kube/mocks/client.go b/provider/cluster/kube/mocks/client.go index 3d62d37b5e..99ed19cd27 100644 --- a/provider/cluster/kube/mocks/client.go +++ b/provider/cluster/kube/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import cluster "github.com/ovrclk/akash/provider/cluster" @@ -49,6 +49,29 @@ func (_m *Client) Deployments() ([]cluster.Deployment, error) { return r0, r1 } +// Inventory provides a mock function with given fields: +func (_m *Client) Inventory() ([]cluster.Node, error) { + ret := _m.Called() + + var r0 []cluster.Node + if rf, ok := ret.Get(0).(func() []cluster.Node); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]cluster.Node) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LeaseStatus provides a mock function with given fields: _a0 func (_m *Client) LeaseStatus(_a0 types.LeaseID) (*types.LeaseStatusResponse, error) { ret := _m.Called(_a0) @@ -131,17 +154,3 @@ func (_m *Client) TeardownLease(_a0 types.LeaseID) error { return r0 } - -// TeardownNamespace provides a mock function with given fields: _a0 -func (_m *Client) TeardownNamespace(_a0 string) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/provider/cluster/manager.go b/provider/cluster/manager.go new file mode 100644 index 0000000000..95b5d2d251 --- /dev/null +++ b/provider/cluster/manager.go @@ -0,0 +1,231 @@ +package cluster + +import ( + "fmt" + "sync" + + lifecycle "github.com/boz/go-lifecycle" + "github.com/ovrclk/akash/provider/session" + "github.com/ovrclk/akash/types" + "github.com/tendermint/tmlibs/log" +) + +type deploymentState string + +const ( + dsDeployActive deploymentState = "deploy-active" + dsDeployPending = "deploy-pending" + dsDeployComplete = "deploy-complete" + dsTeardownActive = "teardown-active" + dsTeardownPending = "teardown-pending" + dsTeardownComplete = "teardown-complete" +) + +type deploymentManager struct { + client Client + session session.Session + + state deploymentState + + lease types.LeaseID + mgroup *types.ManifestGroup + + monitor *deploymentMonitor + wg sync.WaitGroup + + updatech chan *types.ManifestGroup + teardownch chan struct{} + + log log.Logger + lc lifecycle.Lifecycle +} + +func newDeploymentManager(s *service, lease types.LeaseID, mgroup *types.ManifestGroup) *deploymentManager { + + log := s.log.With("cmp", "deployment-manager", + "lease", lease, "manifest-group", mgroup.Name) + + dm := &deploymentManager{ + client: s.client, + session: s.session, + state: dsDeployActive, + lease: lease, + mgroup: mgroup, + wg: sync.WaitGroup{}, + updatech: make(chan *types.ManifestGroup), + teardownch: make(chan struct{}), + log: log, + lc: lifecycle.New(), + } + + go dm.lc.WatchChannel(s.lc.ShuttingDown()) + go dm.run() + + go func() { + <-dm.lc.Done() + s.managerch <- dm + }() + + return dm +} + +func (dm *deploymentManager) update(mgroup *types.ManifestGroup) error { + select { + case dm.updatech <- mgroup: + return nil + case <-dm.lc.ShuttingDown(): + return fmt.Errorf("not running") + } +} + +func (dm *deploymentManager) teardown() error { + select { + case dm.teardownch <- struct{}{}: + return nil + case <-dm.lc.ShuttingDown(): + return fmt.Errorf("not running") + } +} + +func (dm *deploymentManager) run() { + defer dm.lc.ShutdownCompleted() + + runch := dm.startDeploy() + +loop: + for { + select { + + case err := <-dm.lc.ShutdownRequest(): + dm.lc.ShutdownInitiated(err) + break loop + + case mgroup := <-dm.updatech: + + dm.mgroup = mgroup + + switch dm.state { + case dsDeployActive: + dm.mgroup = mgroup + dm.state = dsDeployPending + case dsDeployPending: + dm.mgroup = mgroup + case dsDeployComplete: + dm.mgroup = mgroup + + // start update + runch = dm.startDeploy() + + case dsTeardownActive, dsTeardownPending, dsTeardownComplete: + } + + case result := <-runch: + runch = nil + + if result != nil { + dm.log.Error("execution error", "state", dm.state, "err", result) + } + + switch dm.state { + case dsDeployActive: + dm.log.Debug("deploy complete") + dm.state = dsDeployComplete + + dm.startMonitor() + + case dsDeployPending: + // start update + runch = dm.startDeploy() + + case dsDeployComplete: + + panic(fmt.Errorf("INVALID STATE: runch read on %v", dm.state)) + + case dsTeardownActive: + dm.state = dsTeardownComplete + break loop + + case dsTeardownPending: + + // start teardown + runch = dm.startTeardown() + + case dsTeardownComplete: + + panic(fmt.Errorf("INVALID STATE: runch read on %v", dm.state)) + + } + + case <-dm.teardownch: + dm.log.Debug("teardown request") + + dm.stopMonitor() + + switch dm.state { + case dsDeployActive: + + dm.state = dsTeardownPending + + case dsDeployPending: + + dm.state = dsTeardownPending + + case dsDeployComplete: + + // start teardown + runch = dm.startTeardown() + + case dsTeardownActive, dsTeardownPending, dsTeardownComplete: + } + } + } + + if runch != nil { + <-runch + } + + dm.wg.Wait() +} + +func (dm *deploymentManager) startMonitor() { + dm.wg.Add(1) + dm.monitor = newDeploymentMonitor(dm.log, dm.lc.ShuttingDown(), dm.session, dm.client, dm.lease, dm.mgroup) + go func(m *deploymentMonitor) { + defer dm.wg.Done() + <-m.done() + }(dm.monitor) +} + +func (dm *deploymentManager) stopMonitor() { + if dm.monitor != nil { + dm.monitor.shutdown() + } +} + +func (dm *deploymentManager) startDeploy() <-chan error { + dm.stopMonitor() + dm.state = dsDeployActive + return dm.do(dm.doDeploy) +} + +func (dm *deploymentManager) startTeardown() <-chan error { + dm.stopMonitor() + dm.state = dsTeardownActive + return dm.do(dm.doTeardown) +} + +func (dm *deploymentManager) doDeploy() error { + return dm.client.Deploy(dm.lease, dm.mgroup) +} + +func (dm *deploymentManager) doTeardown() error { + return dm.client.TeardownLease(dm.lease) +} + +func (dm *deploymentManager) do(fn func() error) <-chan error { + ch := make(chan error, 1) + go func() { + ch <- fn() + }() + return ch +} diff --git a/provider/cluster/mocks/client.go b/provider/cluster/mocks/client.go index 2904c86f7b..50f5dd839e 100644 --- a/provider/cluster/mocks/client.go +++ b/provider/cluster/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import cluster "github.com/ovrclk/akash/provider/cluster" @@ -48,6 +48,29 @@ func (_m *Client) Deployments() ([]cluster.Deployment, error) { return r0, r1 } +// Inventory provides a mock function with given fields: +func (_m *Client) Inventory() ([]cluster.Node, error) { + ret := _m.Called() + + var r0 []cluster.Node + if rf, ok := ret.Get(0).(func() []cluster.Node); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]cluster.Node) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LeaseStatus provides a mock function with given fields: _a0 func (_m *Client) LeaseStatus(_a0 types.LeaseID) (*types.LeaseStatusResponse, error) { ret := _m.Called(_a0) @@ -130,17 +153,3 @@ func (_m *Client) TeardownLease(_a0 types.LeaseID) error { return r0 } - -// TeardownNamespace provides a mock function with given fields: _a0 -func (_m *Client) TeardownNamespace(_a0 string) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/provider/cluster/mocks/cluster.go b/provider/cluster/mocks/cluster.go index 7fabc86f67..94ec280495 100644 --- a/provider/cluster/mocks/cluster.go +++ b/provider/cluster/mocks/cluster.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import cluster "github.com/ovrclk/akash/provider/cluster" diff --git a/provider/cluster/mocks/deployment.go b/provider/cluster/mocks/deployment.go index 014da97ca6..547cd86810 100644 --- a/provider/cluster/mocks/deployment.go +++ b/provider/cluster/mocks/deployment.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/provider/cluster/mocks/reservation.go b/provider/cluster/mocks/reservation.go index 9839140216..41cfa4ac8c 100644 --- a/provider/cluster/mocks/reservation.go +++ b/provider/cluster/mocks/reservation.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/provider/cluster/monitor.go b/provider/cluster/monitor.go index 19d91904d8..25988df668 100644 --- a/provider/cluster/monitor.go +++ b/provider/cluster/monitor.go @@ -1,192 +1,186 @@ package cluster import ( - "fmt" + "math/rand" + "time" lifecycle "github.com/boz/go-lifecycle" + "github.com/ovrclk/akash/provider/session" "github.com/ovrclk/akash/types" + "github.com/ovrclk/akash/util/runner" "github.com/tendermint/tmlibs/log" ) -type deploymentState string - const ( - dsDeployActive deploymentState = "deploy-active" - dsDeployPending = "deploy-pending" - dsDeployComplete = "deploy-complete" - dsTeardownActive = "teardown-active" - dsTeardownPending = "teardown-pending" - dsTeardownComplete = "teardown-complete" + monitorMaxRetries = 20 + monitorRetryPeriodMin = time.Second * 2 + monitorRetryPeriodJitter = time.Second * 5 + + monitorHealthcheckPeriodMin = time.Second * 10 + monitorHealthcheckPeriodJitter = time.Second * 5 ) type deploymentMonitor struct { - client Client - - state deploymentState + session session.Session + client Client lease types.LeaseID mgroup *types.ManifestGroup - updatech chan *types.ManifestGroup - teardownch chan struct{} - - log log.Logger - lc lifecycle.Lifecycle + attempts int + log log.Logger + lc lifecycle.Lifecycle } -func newDeploymentMonitor(s *service, lease types.LeaseID, mgroup *types.ManifestGroup) *deploymentMonitor { - - log := s.log.With("cmp", "deployment-monitor", - "lease", lease, "manifest-group", mgroup.Name) - - dm := &deploymentMonitor{ - client: s.client, - state: dsDeployActive, - lease: lease, - mgroup: mgroup, - updatech: make(chan *types.ManifestGroup), - teardownch: make(chan struct{}), - log: log, - lc: lifecycle.New(), +func newDeploymentMonitor(log log.Logger, + donech <-chan struct{}, + session session.Session, + client Client, + lease types.LeaseID, + mgroup *types.ManifestGroup, +) *deploymentMonitor { + + m := &deploymentMonitor{ + session: session, + client: client, + lease: lease, + mgroup: mgroup, + log: log.With("cmp", "deployment-monitor"), + lc: lifecycle.New(), } - go dm.lc.WatchChannel(s.lc.ShuttingDown()) - go dm.run() - - go func() { - <-dm.lc.Done() - s.monitorch <- dm - }() + go m.lc.WatchChannel(donech) + go m.run() - return dm + return m } -func (dm *deploymentMonitor) update(mgroup *types.ManifestGroup) error { - select { - case dm.updatech <- mgroup: - return nil - case <-dm.lc.ShuttingDown(): - return fmt.Errorf("not running") - } +func (m *deploymentMonitor) shutdown() { + m.lc.ShutdownAsync(nil) } -func (dm *deploymentMonitor) teardown() error { - select { - case dm.teardownch <- struct{}{}: - return nil - case <-dm.lc.ShuttingDown(): - return fmt.Errorf("not running") - } +func (m *deploymentMonitor) done() <-chan struct{} { + return m.lc.Done() } -func (dm *deploymentMonitor) run() { - defer dm.lc.ShutdownCompleted() - runch := dm.do(dm.doDeploy) +func (m *deploymentMonitor) run() { + defer m.lc.ShutdownCompleted() + + var runch <-chan runner.Result + + tickch := m.scheduleRetry() loop: for { select { - case err := <-dm.lc.ShutdownRequest(): - dm.lc.ShutdownInitiated(err) + case err := <-m.lc.ShutdownRequest(): + m.lc.ShutdownInitiated(err) break loop - case mgroup := <-dm.updatech: - - dm.mgroup = mgroup - - switch dm.state { - case dsDeployActive: - dm.mgroup = mgroup - case dsDeployPending: - dm.mgroup = mgroup - case dsDeployComplete: - dm.mgroup = mgroup - // start update - dm.state = dsDeployActive - runch = dm.do(dm.doDeploy) - - case dsTeardownActive, dsTeardownPending, dsTeardownComplete: - } + case <-tickch: + tickch = nil + runch = m.runCheck() case result := <-runch: runch = nil - if result != nil { - dm.log.Error("execution error", "state", dm.state, "err", result) + if err := result.Error(); err != nil { + m.log.Error("monitor check", "err", err) } - switch dm.state { - case dsDeployActive: - dm.log.Debug("deploy complete") - - dm.state = dsDeployComplete - - case dsDeployPending: - // start update - dm.state = dsDeployActive - runch = dm.do(dm.doDeploy) + ok := result.Value().(bool) - case dsDeployComplete: + m.log.Info("check result", "ok", ok, "attempt", m.attempts) - panic(fmt.Errorf("INVALID STATE: runch read on %v", dm.state)) + if ok { + // healthy - case dsTeardownActive: - dm.state = dsTeardownComplete - break loop + m.attempts = 0 + tickch = m.scheduleHealthcheck() + break + } - case dsTeardownPending: + if m.attempts <= monitorMaxRetries { + // unhealthy. retry - // start teardown - dm.state = dsTeardownActive - runch = dm.do(dm.doTeardown) + tickch = m.scheduleRetry() + break + } - case dsTeardownComplete: + // deployment failed. cancel. - panic(fmt.Errorf("INVALID STATE: runch read on %v", dm.state)) + m.log.Info("deployment failed. closing.") + // TODO: retry. + if _, err := m.session.TX().BroadcastTxCommit(&types.TxCloseLease{ + LeaseID: m.lease, + }); err != nil { + m.log.Error("closing deployment", "err", err) } + } + } - case <-dm.teardownch: - dm.log.Debug("teardown request") - - switch dm.state { - case dsDeployActive: + if runch != nil { + <-runch + } +} - dm.state = dsTeardownPending +func (m *deploymentMonitor) runCheck() <-chan runner.Result { + m.attempts++ + m.log.Debug("running check", "attempt", m.attempts) + return runner.Do(func() runner.Result { + return runner.NewResult(m.doCheck()) + }) +} - case dsDeployPending: +func (m *deploymentMonitor) doCheck() (bool, error) { + status, err := m.client.LeaseStatus(m.lease) - dm.state = dsTeardownPending + if err != nil { + m.log.Error("lease status", "err", err) + return false, err + } - case dsDeployComplete: + badsvc := 0 - // start teardown - dm.state = dsTeardownActive - runch = dm.do(dm.doTeardown) + for _, spec := range m.mgroup.Services { - case dsTeardownActive, dsTeardownPending, dsTeardownComplete: + found := false + for _, svc := range status.Services { + if svc.Name != spec.Name { + continue + } + found = true + + if uint32(svc.Available) < spec.Count { + badsvc++ + m.log.Debug("service available replicas below target", + "service", spec.Name, + "available", svc.Available, + "target", spec.Count, + ) } } - } - if runch != nil { - <-runch + if !found { + badsvc++ + m.log.Debug("service status found", "service", spec.Name) + } } + + return badsvc == 0, nil } -func (dm *deploymentMonitor) doDeploy() error { - return dm.client.Deploy(dm.lease, dm.mgroup) +func (m *deploymentMonitor) scheduleRetry() <-chan time.Time { + return m.schedule(monitorRetryPeriodMin, monitorRetryPeriodJitter) } -func (dm *deploymentMonitor) doTeardown() error { - return dm.client.TeardownLease(dm.lease) +func (m *deploymentMonitor) scheduleHealthcheck() <-chan time.Time { + return m.schedule(monitorHealthcheckPeriodMin, monitorHealthcheckPeriodJitter) } -func (dm *deploymentMonitor) do(fn func() error) <-chan error { - ch := make(chan error, 1) - go func() { - ch <- fn() - }() - return ch +func (m *deploymentMonitor) schedule(min, jitter time.Duration) <-chan time.Time { + period := min + time.Duration(rand.Int63n(int64(jitter))) + return time.After(period) } diff --git a/provider/cluster/service.go b/provider/cluster/service.go index 196ad8a2e8..2829c111cd 100644 --- a/provider/cluster/service.go +++ b/provider/cluster/service.go @@ -8,6 +8,7 @@ import ( lifecycle "github.com/boz/go-lifecycle" "github.com/ovrclk/akash/provider/event" + "github.com/ovrclk/akash/provider/session" "github.com/ovrclk/akash/types" "github.com/tendermint/tmlibs/log" ) @@ -25,9 +26,9 @@ type Service interface { Done() <-chan struct{} } -func NewService(log log.Logger, ctx context.Context, bus event.Bus, client Client) (Service, error) { +func NewService(ctx context.Context, session session.Session, bus event.Bus, client Client) (Service, error) { - log = log.With("module", "provider-cluster") + log := session.Log().With("module", "provider-cluster") sub, err := bus.Subscribe() if err != nil { @@ -43,11 +44,12 @@ func NewService(log log.Logger, ctx context.Context, bus event.Bus, client Clien log.Info("found managed deployments", "count", len(deployments)) s := &service{ + session: session, client: client, bus: bus, sub: sub, deployments: make(map[string]*managedDeployment), - monitorch: make(chan *deploymentMonitor), + managerch: make(chan *deploymentManager), reservech: make(chan reserveRequest), log: log, lc: lifecycle.New(), @@ -60,14 +62,15 @@ func NewService(log log.Logger, ctx context.Context, bus event.Bus, client Clien } type service struct { - client Client - bus event.Bus - sub event.Subscriber + session session.Session + client Client + bus event.Bus + sub event.Subscriber deployments map[string]*managedDeployment reservech chan reserveRequest - monitorch chan *deploymentMonitor + managerch chan *deploymentManager log log.Logger lc lifecycle.Lifecycle @@ -75,7 +78,7 @@ type service struct { type managedDeployment struct { reservation Reservation - monitor *deploymentMonitor + manager *deploymentManager } func (s *service) Close() error { @@ -123,7 +126,7 @@ func (s *service) run(deployments []Deployment) { // TODO: recover reservation key := deployment.LeaseID().OrderID().String() s.deployments[key] = &managedDeployment{ - monitor: newDeploymentMonitor(s, deployment.LeaseID(), deployment.ManifestGroup()), + manager: newDeploymentManager(s, deployment.LeaseID(), deployment.ManifestGroup()), reservation: newReservation(deployment.LeaseID().OrderID(), nil), } } @@ -155,12 +158,12 @@ loop: break } - if state.monitor == nil { - state.monitor = newDeploymentMonitor(s, ev.LeaseID, mgroup) + if state.manager == nil { + state.manager = newDeploymentManager(s, ev.LeaseID, mgroup) break } - if err := state.monitor.update(mgroup); err != nil { + if err := state.manager.update(mgroup); err != nil { s.log.Error("updating deployment", "err", err, "lease", ev.LeaseID) } @@ -178,6 +181,10 @@ loop: s.teardownOrder(ev.OrderID()) + case *event.TxCloseLease: + + s.teardownOrder(ev.OrderID()) + } case req := <-s.reservech: // TODO: handle inventory @@ -199,9 +206,9 @@ loop: req.ch <- reserveResponse{state.reservation, nil} - case dm := <-s.monitorch: + case dm := <-s.managerch: - s.log.Debug("monitor done", "order", dm.lease.OrderID()) + s.log.Debug("manager done", "order", dm.lease.OrderID()) // todo: unreserve resources @@ -209,12 +216,12 @@ loop: } } - s.log.Debug("draining deployment monitors...") + s.log.Debug("draining deployment managers...") for _, state := range s.deployments { - if state.monitor != nil { - dm := <-s.monitorch - s.log.Debug("monitor done", "order", dm.lease.OrderID()) + if state.manager != nil { + dm := <-s.managerch + s.log.Debug("manager done", "order", dm.lease.OrderID()) } } @@ -229,12 +236,12 @@ func (s *service) teardownOrder(oid types.OrderID) { s.log.Debug("unregistering order", "order", oid) - if state.monitor == nil { + if state.manager == nil { delete(s.deployments, key) return } - if err := state.monitor.teardown(); err != nil { + if err := state.manager.teardown(); err != nil { s.log.Error("tearing down deployment", "err", err, "order", oid) } } diff --git a/provider/cluster/service_test.go b/provider/cluster/service_test.go index 71b1f543ef..06e14a6b7f 100644 --- a/provider/cluster/service_test.go +++ b/provider/cluster/service_test.go @@ -7,7 +7,10 @@ import ( "github.com/ovrclk/akash/provider/cluster" "github.com/ovrclk/akash/provider/cluster/mocks" "github.com/ovrclk/akash/provider/event" + "github.com/ovrclk/akash/provider/session" + qmocks "github.com/ovrclk/akash/query/mocks" "github.com/ovrclk/akash/testutil" + txumocks "github.com/ovrclk/akash/txutil/mocks" "github.com/ovrclk/akash/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -15,14 +18,15 @@ import ( ) func TestService_Reserve(t *testing.T) { - log := testutil.Logger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() bus := event.NewBus() defer bus.Close() - c, err := cluster.NewService(log, ctx, bus, cluster.NullClient()) + session := providerSession(t) + + c, err := cluster.NewService(ctx, session, bus, cluster.NullClient()) require.NoError(t, err) group := testutil.DeploymentGroups(testutil.DeploymentAddress(t), 1).Items[0] @@ -59,8 +63,6 @@ func TestService_Teardown_TxCloseFulfillment(t *testing.T) { } func withServiceTestSetup(t *testing.T, fn func(event.Bus, types.LeaseID)) { - - log := testutil.Logger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -96,7 +98,11 @@ func withServiceTestSetup(t *testing.T, fn func(event.Bus, types.LeaseID)) { Return(nil, nil). Once() - c, err := cluster.NewService(log, ctx, bus, client) + client.On("LeaseStatus", lease.LeaseID). + Return(&types.LeaseStatusResponse{}, nil). + Maybe() + + c, err := cluster.NewService(ctx, providerSession(t), bus, client) require.NoError(t, err) _, err = c.Reserve(order.OrderID, group) @@ -118,3 +124,11 @@ func withServiceTestSetup(t *testing.T, fn func(event.Bus, types.LeaseID)) { require.NoError(t, c.Close()) mock.AssertExpectationsForObjects(t, client) } + +func providerSession(t *testing.T) session.Session { + log := testutil.Logger() + txc := new(txumocks.Client) + qc := new(qmocks.Client) + provider := testutil.Provider(testutil.Address(t), 1) + return session.New(log, provider, txc, qc) +} diff --git a/provider/event/tx.go b/provider/event/tx.go index 71ec945615..a6bed8c93d 100644 --- a/provider/event/tx.go +++ b/provider/event/tx.go @@ -18,6 +18,7 @@ type ( TxCreateLease = types.TxCreateLease TxCloseDeployment = types.TxCloseDeployment TxCloseFulfillment = types.TxCloseFulfillment + TxCloseLease = types.TxCloseLease ) // Wrap tendermint event bus - publish events from tendermint bus to our bus implementation. @@ -29,19 +30,22 @@ func MarketplaceTxPublisher(ctx context.Context, log log.Logger, tmbus tmtmtypes func MarketplaceTxHandler(bus Bus) marketplace.Handler { return marketplace.NewBuilder(). OnTxCreateOrder(func(tx *types.TxCreateOrder) { - bus.Publish((*TxCreateOrder)(tx)) + bus.Publish(tx) }). OnTxCreateFulfillment(func(tx *types.TxCreateFulfillment) { - bus.Publish((*TxCreateFulfillment)(tx)) + bus.Publish(tx) }). OnTxCreateLease(func(tx *types.TxCreateLease) { - bus.Publish((*TxCreateLease)(tx)) + bus.Publish(tx) }). OnTxCloseDeployment(func(tx *types.TxCloseDeployment) { - bus.Publish((*TxCloseDeployment)(tx)) + bus.Publish(tx) }). OnTxCloseFulfillment(func(tx *types.TxCloseFulfillment) { - bus.Publish((*TxCloseFulfillment)(tx)) + bus.Publish(tx) + }). + OnTxCloseLease(func(tx *types.TxCloseLease) { + bus.Publish(tx) }). Create() } diff --git a/provider/manifest/mocks/handler.go b/provider/manifest/mocks/handler.go index a1d294e4e6..f195df59c4 100644 --- a/provider/manifest/mocks/handler.go +++ b/provider/manifest/mocks/handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import mock "github.com/stretchr/testify/mock" diff --git a/provider/service.go b/provider/service.go index c5687662a3..aed2abcde7 100644 --- a/provider/service.go +++ b/provider/service.go @@ -24,7 +24,7 @@ func NewService(ctx context.Context, session session.Session, bus event.Bus, ccl session = session.ForModule("provider-service") - cluster, err := cluster.NewService(session.Log(), ctx, bus, cclient) + cluster, err := cluster.NewService(ctx, session, bus, cclient) if err != nil { cancel() return nil, err diff --git a/query/mocks/client.go b/query/mocks/client.go index f6609dfb5d..c79c51df01 100644 --- a/query/mocks/client.go +++ b/query/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import context "context" diff --git a/txutil/mocks/client.go b/txutil/mocks/client.go index 84555dc7ef..f9519ff125 100644 --- a/txutil/mocks/client.go +++ b/txutil/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0 +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks import core_types "github.com/tendermint/tendermint/rpc/core/types"