From afb0430952b07c95b7e599c80721a27d21f34080 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Wed, 12 Jan 2022 16:47:41 -0500 Subject: [PATCH] refactor: remove deploy command Signed-off-by: Artur Troian --- cmd/akash/cmd/root.go | 2 - deploy/cmd/create.go | 464 ---------------------------------- deploy/cmd/deployment-data.go | 143 ----------- deploy/cmd/event-emitters.go | 59 ----- deploy/cmd/event-handlers.go | 179 ------------- deploy/cmd/root.go | 34 --- deploy/cmd/utils.go | 68 ----- 7 files changed, 949 deletions(-) delete mode 100644 deploy/cmd/create.go delete mode 100644 deploy/cmd/deployment-data.go delete mode 100644 deploy/cmd/event-emitters.go delete mode 100644 deploy/cmd/event-handlers.go delete mode 100644 deploy/cmd/root.go delete mode 100644 deploy/cmd/utils.go diff --git a/cmd/akash/cmd/root.go b/cmd/akash/cmd/root.go index de0fd0a09d..725d22d66f 100644 --- a/cmd/akash/cmd/root.go +++ b/cmd/akash/cmd/root.go @@ -38,7 +38,6 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/ovrclk/akash/app" - dcmd "github.com/ovrclk/akash/deploy/cmd" ecmd "github.com/ovrclk/akash/events/cmd" pcmd "github.com/ovrclk/akash/provider/cmd" "github.com/ovrclk/akash/sdkutil" @@ -125,7 +124,6 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) { sdkutil.InitSDKConfig() rootCmd.AddCommand( rpc.StatusCommand(), - dcmd.RootCmd(), pcmd.RootCmd(), ecmd.EventCmd(), queryCmd(), diff --git a/deploy/cmd/create.go b/deploy/cmd/create.go deleted file mode 100644 index 1619533807..0000000000 --- a/deploy/cmd/create.go +++ /dev/null @@ -1,464 +0,0 @@ -package cmd - -import ( - "context" - "encoding/json" - "fmt" - "math/rand" - "os" - "sort" - "sync" - "time" - - "github.com/ovrclk/akash/cmd/common" - dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" - mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" - - "github.com/avast/retry-go" - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/flags" - sdk "github.com/cosmos/cosmos-sdk/types" - ctypes "github.com/ovrclk/akash/provider/cluster/types/v1beta2" - gateway "github.com/ovrclk/akash/provider/gateway/rest" - dcli "github.com/ovrclk/akash/x/deployment/client/cli" - "github.com/pkg/errors" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/spf13/viper" - "golang.org/x/sync/errgroup" -) - -const ( - // FlagTimeout represents max amount of time for lease status checking process - FlagTimeout = "timeout" - // FlagTick represents time interval at which lease status is checked - FlagTick = "tick" -) - -func retryIfGatewayClientResponseError(err error) bool { - _, isClientErr := err.(gateway.ClientResponseError) - return isClientErr -} - -var errDeployTimeout = errors.New("Timed out while trying to deploy") -var DefaultDeposit = dtypes.DefaultDeploymentMinDeposit - -// createCmd represents the create command -func createCmd() *cobra.Command { - - cmd := &cobra.Command{ - Use: "create [sdl-file]", - Args: cobra.ExactArgs(1), - Short: "Create a deployment on the akash network", - RunE: func(cmd *cobra.Command, args []string) error { - timeoutDuration := viper.GetDuration(FlagTimeout) - endAt := time.Now().Add(timeoutDuration) - ctx, cancel := context.WithDeadline(cmd.Context(), endAt) - tickDuration := viper.GetDuration(FlagTick) - - maxDelay := tickDuration - const defaultMaxDelay = 15 * time.Second - if maxDelay < defaultMaxDelay { - maxDelay = defaultMaxDelay - } - - retryConfiguration := []retry.Option{ - retry.DelayType(retry.BackOffDelay), - retry.Attempts(9999), // Use a large number here, since a deadline is used on the context - retry.MaxDelay(maxDelay), - retry.Delay(tickDuration), - retry.RetryIf(retryIfGatewayClientResponseError), - retry.Context(ctx), - } - - clientCtx, err := client.GetClientTxContext(cmd) - if err != nil { - return err - } - - gClientDir, err := gateway.NewClientDirectory(cmd.Context(), clientCtx) - if err != nil { - if os.IsNotExist(err) { - err = errors.Errorf("no certificate file found for account %q.\n"+ - "consider creating it as certificate required to create a deployment", clientCtx.FromAddress.String()) - } - return err - } - - log := logger.With("cli", "create") - dd, err := NewDeploymentData(args[0], cmd.Flags(), clientCtx) - if err != nil { - return err - } - - group, _ := errgroup.WithContext(ctx) - - // Listen to on chain events and send the manifest when required - leasesReady := make(chan struct{}, 1) - bids := make(chan mtypes.EventBidCreated, 1) - group.Go(func() error { - if err = ChainEmitter(ctx, clientCtx, DeploymentDataUpdateHandler(dd, bids, leasesReady), SendManifestHander(clientCtx, dd, gClientDir, retryConfiguration)); err != nil && !errors.Is(err, context.Canceled) { - log.Error("error watching events", "err", err) - cancel() - } - return err - }) - - // Send the deployment creation transaction - group.Go(func() error { - if err = TxCreateDeployment(clientCtx, cmd.Flags(), dd); err != nil && !errors.Is(err, context.Canceled) { - log.Error("error creating deployment", "err", err) - cancel() - } - return err - }) - - wfb := newWaitForBids(dd, bids) - group.Go(func() error { - if err = wfb.run(ctx, cancel, clientCtx, cmd.Flags()); err != nil && !errors.Is(err, context.Canceled) { - log.Error("error waiting for bids to be made", "err", err) - cancel() - } - return err - }) - - wfl := newWaitForLeases(dd, gClientDir, retryConfiguration, leasesReady) - // Wait for the leases to be created and then start polling the provider for service availability - group.Go(func() error { - if err = wfl.run(ctx, cancel); err != nil && !errors.Is(err, context.Canceled) { - log.Error("error waiting for services to be ready", "err", err) - cancel() - } - return err - }) - - // This returns "context cancelled" when everything goes OK - err = group.Wait() - cancel() - if err != nil && errors.Is(err, context.Canceled) && wfl.allLeasesOk { - err = nil // Not an actual error to stop on - } - - if err != nil { - return err - } - - // Reset the context - ctx, cancel = context.WithDeadline(cmd.Context(), endAt) - err = wfl.eachService(func(leaseID mtypes.LeaseID, serviceName string) error { - gclient, err := gClientDir.GetClientFromBech32(leaseID.Provider) - if err != nil { - return err - } - - var status *ctypes.ServiceStatus - if err = retry.Do(func() error { - status, err = gclient.ServiceStatus(ctx, leaseID, serviceName) - return err - }); err != nil { - return err - } - - // Encode and show the response - statusEncoded, err := json.MarshalIndent(status, "", " ") - if err != nil { - return nil - } - - _, err = os.Stdout.Write(statusEncoded) - if err != nil { - return err - } - _, err = fmt.Print("\n") - return err - }) - cancel() - - if errors.Is(err, context.Canceled) { - return errDeployTimeout - } - - return err - }, - } - - cmd.Flags().String(flags.FlagChainID, "", "The network chain ID") - if err := viper.BindPFlag(flags.FlagChainID, cmd.Flags().Lookup(flags.FlagChainID)); err != nil { - return nil - } - - cmd.Flags().Duration(FlagTimeout, 150*time.Second, "The max amount of time to wait for deployment status checking process") - if err := viper.BindPFlag(FlagTimeout, cmd.Flags().Lookup(FlagTimeout)); err != nil { - return nil - } - - cmd.Flags().Duration(FlagTick, 500*time.Millisecond, "The time interval at which deployment status is checked") - if err := viper.BindPFlag(FlagTick, cmd.Flags().Lookup(FlagTick)); err != nil { - return nil - } - - flags.AddTxFlagsToCmd(cmd) - dcli.AddDeploymentIDFlags(cmd.Flags()) - common.AddDepositFlags(cmd.Flags(), DefaultDeposit) - - return cmd -} - -func newWaitForBids(dd *DeploymentData, bids <-chan mtypes.EventBidCreated) *waitForBids { - return &waitForBids{ - bids: bids, - groupCount: len(dd.Groups), - } -} - -type waitForBids struct { - groupCount int - bids <-chan mtypes.EventBidCreated -} - -type bidList []mtypes.EventBidCreated - -func (bl bidList) Len() int { - return len(bl) -} - -func (bl bidList) Less(i, j int) bool { - lhs := bl[i] - rhs := bl[j] - - if !lhs.Price.Amount.Equal(rhs.Price.Amount) { - return lhs.Price.Amount.LT(rhs.Price.Amount) - } - - return lhs.ID.Provider < rhs.ID.Provider -} - -func (bl bidList) Swap(i, j int) { - bl[i], bl[j] = bl[j], bl[i] -} - -func (wfb *waitForBids) run(ctx context.Context, cancel context.CancelFunc, clientCtx client.Context, flags *pflag.FlagSet) error { - var bidDeadline <-chan time.Time - allBids := make(map[uint32]bidList) -loop: - for { - select { - case bid := <-wfb.bids: - logger.Debug("Processing bid") - bidsForGroup := allBids[bid.ID.GSeq] - - allBids[bid.ID.GSeq] = append(bidsForGroup, bid) - - // If there is a bid for at least every group, then start the deadline - if nil == bidDeadline && len(allBids) == wfb.groupCount { - logger.Debug("All groups have at least one bid") - // TODO - this value was made up - ticker := time.NewTicker(time.Second * 15) - bidDeadline = ticker.C - defer ticker.Stop() - } - - case <-ctx.Done(): - cancel() - return context.Canceled - case <-bidDeadline: - logger.Info("Done waiting on bids", "qty", len(allBids)) - break loop - } - } - - for gseq, bidsForGroup := range allBids { - // Create the lease, using the lowest price - sort.Sort(bidsForGroup) - winningBid := bidsForGroup[0] - - // check for more than 1 bid having the same price - if len(allBids) > 1 && winningBid.Price.Equal(bidsForGroup[1].Price) { - identical := make(bidList, 0) - for _, bid := range bidsForGroup { - if bid.Price.Equal(winningBid.Price) { - identical = append(identical, bid) - } - } - logger.Info("Multiple bids with identical price", "gseq", gseq, "price", winningBid.Price.String(), "qty", len(identical)) - rng := rand.New(rand.NewSource(int64(winningBid.ID.DSeq))) // nolint - choice := rng.Intn(len(identical)) - - winningBid = identical[choice] - } - - logger.Info("Winning bid", "gseq", gseq, "price", winningBid.Price.String(), "provider", winningBid.ID.Provider) - - ev := &mtypes.MsgCreateLease{ - BidID: winningBid.ID, - } - - _, err := SendMsgs(clientCtx, flags, []sdk.Msg{ev}) - if err != nil { - return err - } - - } - - return nil -} - -func newWaitForLeases(dd *DeploymentData, gClientDir *gateway.ClientDirectory, retryConfiguration []retry.Option, leasesReady <-chan struct{}) *waitForLeases { - return &waitForLeases{ - dd: dd, - gClientDir: gClientDir, - leasesReady: leasesReady, - retryConfiguration: retryConfiguration, - allLeasesOk: false, - } -} - -type leaseAndService struct { - leaseID mtypes.LeaseID - serviceName string -} - -type waitForLeases struct { - dd *DeploymentData - gClientDir *gateway.ClientDirectory - leasesReady <-chan struct{} - retryConfiguration []retry.Option - allLeasesOk bool - services []leaseAndService - lock sync.Mutex -} - -func (wfl *waitForLeases) eachService(fn func(leaseID mtypes.LeaseID, serviceName string) error) error { - for _, entry := range wfl.services { - err := fn(entry.leaseID, entry.serviceName) - if err != nil { - return err - } - } - return nil -} - -var errLeaseNotReady = errors.New("lease not ready") - -// WaitForLeasesAndPollService waits for leases -func (wfl *waitForLeases) run(ctx context.Context, cancel context.CancelFunc) error { - log := logger - - // Wait for signal that expected leases exist - select { - case <-wfl.leasesReady: - - case <-ctx.Done(): - cancel() - return context.Canceled - } - - leases := wfl.dd.Leases() - log.Info("Waiting on leases to be ready", "leaseQuantity", len(leases)) - - var localRetryConfiguration []retry.Option - localRetryConfiguration = append(localRetryConfiguration, wfl.retryConfiguration...) - - retryIf := func(err error) bool { - if retryIfGatewayClientResponseError(err) { - return true - } - - return errors.Is(err, errLeaseNotReady) - } - localRetryConfiguration = append(localRetryConfiguration, retry.RetryIf(retryIf)) - - leaseChecker := func(leaseID mtypes.LeaseID) (func() error, error) { - log.Debug("Checking status of lease", "lease", leaseID) - - gclient, err := wfl.gClientDir.GetClientFromBech32(leaseID.GetProvider()) - if err != nil { - cancel() - return nil, err - } - - servicesChecked := make(map[string]bool) - - return func() error { - err = retry.Do(func() error { - ls, err := gclient.LeaseStatus(ctx, leaseID) - - if err != nil { - log.Debug("Could not get lease status", "lease", leaseID, "err", err) - return err - } - - for serviceName, s := range ls.Services { - checked := servicesChecked[serviceName] - if checked { - continue - } - isOk := s.Available == s.Total - if !isOk { - return fmt.Errorf("%w: service %q has %d / %d available", errLeaseNotReady, serviceName, s.Available, s.Total) - } - servicesChecked[serviceName] = true - log.Info("service ready", "lease", leaseID, "service", serviceName) - } - - // Update the shared data - wfl.lock.Lock() - defer wfl.lock.Unlock() - for serviceName := range ls.Services { - wfl.services = append(wfl.services, leaseAndService{ - leaseID: leaseID, - serviceName: serviceName, - }) - } - return nil - }, localRetryConfiguration...) - if err != nil { - return err - } - - log.Info("lease ready", "leaseID", leaseID) - return nil - }, nil - } - - group, _ := errgroup.WithContext(ctx) - - for _, leaseID := range leases { - fn, err := leaseChecker(leaseID) - if err != nil { - return err - } - group.Go(fn) - } - - err := group.Wait() - if err == nil { // If all return without error, then all leases are ready - wfl.allLeasesOk = true - } - cancel() - return nil -} - -// TxCreateDeployment takes DeploymentData and creates the specified deployment -func TxCreateDeployment(clientCtx client.Context, flags *pflag.FlagSet, dd *DeploymentData) (err error) { - res, err := SendMsgs(clientCtx, flags, []sdk.Msg{dd.MsgCreate()}) - log := logger.With( - "msg", "create-deployment", - ) - - if err != nil || res == nil || res.Code != 0 { - log.Error("tx failed") - return err - } - - log = logger.With( - "hash", res.TxHash, - "code", res.Code, - "codespace", res.Codespace, - "action", "create-deployment", - "dseq", dd.DeploymentID.DSeq, - ) - - log.Info("tx sent successfully") - return nil -} diff --git a/deploy/cmd/deployment-data.go b/deploy/cmd/deployment-data.go deleted file mode 100644 index 69b1e0b665..0000000000 --- a/deploy/cmd/deployment-data.go +++ /dev/null @@ -1,143 +0,0 @@ -package cmd - -import ( - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/ovrclk/akash/cmd/common" - "sync" - - "github.com/cosmos/cosmos-sdk/client" - manifest "github.com/ovrclk/akash/manifest/v2beta1" - "github.com/ovrclk/akash/sdl" - dcli "github.com/ovrclk/akash/x/deployment/client/cli" - dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" - mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" - "github.com/spf13/pflag" -) - -// DeploymentData contains the various IDs involved in a deployment -type DeploymentData struct { - SDL sdl.SDL - Manifest manifest.Manifest - Groups []*dtypes.GroupSpec - DeploymentID dtypes.DeploymentID - OrderID []mtypes.OrderID - LeaseID []mtypes.LeaseID - Version []byte - Deposit sdk.Coin - - sync.RWMutex -} - -// MsgCreate constructor for MsgCreateDeployment -func (dd *DeploymentData) MsgCreate() *dtypes.MsgCreateDeployment { - // Create the deployment message - msg := &dtypes.MsgCreateDeployment{ - ID: dd.DeploymentID, - Groups: make([]dtypes.GroupSpec, 0, len(dd.Groups)), - Version: dd.Version, - Deposit: dd.Deposit, - } - - // Append the groups to the message - for _, group := range dd.Groups { - msg.Groups = append(msg.Groups, *group) - } - - return msg -} - -// ExpectedLeases returns true if all the leases are in state -func (dd *DeploymentData) ExpectedLeases() bool { - dd.RLock() - defer dd.RUnlock() - return len(dd.Groups) == len(dd.LeaseID) -} - -// ExpectedOrders returns true if all the orders are in state -func (dd *DeploymentData) ExpectedOrders() bool { - dd.RLock() - defer dd.RUnlock() - return len(dd.Groups) == len(dd.OrderID) -} - -// AddOrder adds an order for tracking -func (dd *DeploymentData) AddOrder(order mtypes.OrderID) { - dd.Lock() - defer dd.Unlock() - // TODO: Check that order isn't already tracked - dd.OrderID = append(dd.OrderID, order) -} - -// RemoveOrder adds an order for tracking -func (dd *DeploymentData) RemoveOrder(order mtypes.OrderID) { - dd.Lock() - defer dd.Unlock() - var out []mtypes.OrderID - for _, o := range dd.OrderID { - if !order.Equals(o) { - out = append(out, o) - } - } - dd.OrderID = out -} - -// Leases returns a copy of the LeaseIDs tracked -func (dd *DeploymentData) Leases() []mtypes.LeaseID { - dd.RLock() - defer dd.RUnlock() - out := dd.LeaseID - return out -} - -// AddLease adds a lease for tracking -func (dd *DeploymentData) AddLease(lease mtypes.LeaseID) { - dd.Lock() - defer dd.Unlock() - // TODO: Check that lease isn't already tracked - dd.LeaseID = append(dd.LeaseID, lease) -} - -// NewDeploymentData returns a DeploymentData struct initialized from a file and flags -func NewDeploymentData(filename string, flags *pflag.FlagSet, clientCtx client.Context) (*DeploymentData, error) { - sdlSpec, err := sdl.ReadFile(filename) - if err != nil { - return nil, err - } - groups, err := sdlSpec.DeploymentGroups() - if err != nil { - return nil, err - } - mani, err := sdlSpec.Manifest() - if err != nil { - return nil, err - } - ver, err := sdl.ManifestVersion(mani) - if err != nil { - return nil, err - } - id, err := dcli.DeploymentIDFromFlagsForOwner(flags, clientCtx.GetFromAddress()) - - if err != nil { - return nil, err - } - if id.DSeq == 0 { - if id.DSeq, err = dcli.CurrentBlockHeight(clientCtx); err != nil { - return nil, err - } - } - - deposit, err := common.DepositFromFlags(flags) - if err != nil { - return nil, err - } - return &DeploymentData{ - SDL: sdlSpec, - Manifest: mani, - Groups: groups, - DeploymentID: id, - OrderID: make([]mtypes.OrderID, 0), - LeaseID: make([]mtypes.LeaseID, 0), - Version: ver, - Deposit: deposit, - }, nil -} diff --git a/deploy/cmd/event-emitters.go b/deploy/cmd/event-emitters.go deleted file mode 100644 index bc989d5736..0000000000 --- a/deploy/cmd/event-emitters.go +++ /dev/null @@ -1,59 +0,0 @@ -package cmd - -import ( - "context" - - "github.com/cosmos/cosmos-sdk/client" - "github.com/ovrclk/akash/events" - "github.com/ovrclk/akash/pubsub" - "golang.org/x/sync/errgroup" -) - -// EventEmitter is a type that describes event emitter functions -type EventEmitter func(context.Context, ...EventHandler) error - -// ChainEmitter runs the passed EventHandlers just on the on chain event stream -func ChainEmitter(ctx context.Context, clientCtx client.Context, ehs ...EventHandler) (err error) { - // Instantiate and start tendermint RPC client - if err = clientCtx.Client.Start(); err != nil { - return err - } - - // Start the pubsub bus - bus := pubsub.NewBus() - defer bus.Close() - - // Subscribe to the bus events - subscriber, err := bus.Subscribe() - if err != nil { - return err - } - - // Initialize a new error group - group, ctx := errgroup.WithContext(ctx) - - // Publish chain events to the pubsub bus - group.Go(func() error { - return events.Publish(ctx, clientCtx.Client, "akash-deploy", bus) - }) - - // Handle all the events coming out of the bus - group.Go(func() error { - for { - select { - case <-ctx.Done(): - return nil - case <-subscriber.Done(): - return nil - case ev := <-subscriber.Events(): - for _, eh := range ehs { - if err = eh(ev); err != nil { - return err - } - } - } - } - }) - - return group.Wait() -} diff --git a/deploy/cmd/event-handlers.go b/deploy/cmd/event-handlers.go deleted file mode 100644 index f176e2ba42..0000000000 --- a/deploy/cmd/event-handlers.go +++ /dev/null @@ -1,179 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "github.com/avast/retry-go" - "github.com/spf13/viper" - "net/http" - "time" - - "github.com/cosmos/cosmos-sdk/client" - gateway "github.com/ovrclk/akash/provider/gateway/rest" - "github.com/ovrclk/akash/pubsub" - dtypes "github.com/ovrclk/akash/x/deployment/types/v1beta2" - mtypes "github.com/ovrclk/akash/x/market/types/v1beta2" - "github.com/pkg/errors" -) - -// EventHandler is a type of function that handles events coming out of the event bus -type EventHandler func(pubsub.Event) error - -// SendManifestHander sends manifests on the lease created event -func SendManifestHander(clientCtx client.Context, dd *DeploymentData, gClientDir *gateway.ClientDirectory, retryConfiguration []retry.Option) func(pubsub.Event) error { - pollingRate := viper.GetDuration(FlagTick) - retryIf := func(err error) bool { - isGatewayError := retryIfGatewayClientResponseError(err) - if isGatewayError { - gwError := err.(gateway.ClientResponseError) - - switch gwError.Status { - case http.StatusInternalServerError: - return false // don't retry, the provider can't use this manifest - case http.StatusUnprocessableEntity: - return false // don't retry, the manifest isn't well formed - default: - } - - } - - return isGatewayError - } - - var localRetryConfiguration []retry.Option - localRetryConfiguration = append(localRetryConfiguration, retryConfiguration...) - localRetryConfiguration = append(localRetryConfiguration, retry.RetryIf(retryIf)) - - return func(ev pubsub.Event) (err error) { - addr := clientCtx.GetFromAddress() - log := logger.With("action", "send-manifest") - - evLeaseCreated, ok := ev.(mtypes.EventLeaseCreated) - if ok && addr.String() == evLeaseCreated.ID.Owner && evLeaseCreated.ID.DSeq == dd.DeploymentID.DSeq { - // The provider responds to the same event to get ready for a deployment, so sleep here to - // avoid racing the provider - time.Sleep(pollingRate) - log.Info("sending manifest to provider", "provider", evLeaseCreated.ID.Provider, "dseq", evLeaseCreated.ID.DSeq) - - gclient, err := gClientDir.GetClientFromBech32(evLeaseCreated.ID.Provider) - if err != nil { - return err - } - - return retry.Do(func() error { - err := gclient.SubmitManifest(context.Background(), dd.DeploymentID.DSeq, dd.Manifest) - if err != nil { - log.Debug("send-manifest failed", "lease", evLeaseCreated.ID, "err", err) - } - - return err - }, localRetryConfiguration...) - } - return - } -} - -var errUnexpectedEvent = errors.New("unexpected event") - -// DeploymentDataUpdateHandler updates a DeploymentData and prints relevant events -func DeploymentDataUpdateHandler(dd *DeploymentData, bids chan<- mtypes.EventBidCreated, leasesReady chan<- struct{}) func(pubsub.Event) error { - return func(ev pubsub.Event) (err error) { - addr := dd.DeploymentID.Owner - log := logger.With("addr", addr, "dseq", dd.DeploymentID.DSeq) - - switch event := ev.(type) { - // Handle deployment creation events - case dtypes.EventDeploymentCreated: - if event.ID.Equals(dd.DeploymentID) { - log.Info("deployment created") - } - return - - // Handle deployment update events - case dtypes.EventDeploymentUpdated: - if event.ID.Equals(dd.DeploymentID) { - log.Info("deployment updated") - } - return - - // Handle deployment close events - case dtypes.EventDeploymentClosed: - if event.ID.Equals(dd.DeploymentID) { - log.Error("deployment closed unexpectedly") - - // TODO - exit here - return fmt.Errorf("%w: deployment closed", errUnexpectedEvent) - } - return - - // Handle deployment group close events - case dtypes.EventGroupClosed: - if event.ID.Owner == addr && event.ID.DSeq == dd.DeploymentID.DSeq { - // TODO: Maybe more housekeeping here? - log.Info("deployment group closed") - } - return - - // Handle Order creation events - case mtypes.EventOrderCreated: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - dd.AddOrder(event.ID) - log.Info("order for deployment created", "oseq", event.ID.OSeq) - } - return - - // Handle Order close events - case mtypes.EventOrderClosed: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - dd.RemoveOrder(event.ID) - log.Info("order for deployment closed", "oseq", event.ID.OSeq) - } - return - - // Handle Bid creation events - case mtypes.EventBidCreated: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - log.Info("bid for order created", "oseq", event.ID.OSeq, "price", event.Price) - bids <- event - } - return - - // Handle Bid close events - case mtypes.EventBidClosed: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - log.Info("bid for order closed", "oseq", event.ID.OSeq, "price", event.Price) - } - return - - // Handle Lease creation events - case mtypes.EventLeaseCreated: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - dd.AddLease(event.ID) - if dd.ExpectedLeases() { - // Write to channel without blocking, it is buffered - select { - case leasesReady <- struct{}{}: - log.Info("All expected leases created") - default: - } - } - - log.Info("lease for order created", "oseq", event.ID.OSeq, "price", event.Price) - } - return - - // Handle Lease close events - case mtypes.EventLeaseClosed: - if addr == event.ID.Owner && event.ID.DSeq == dd.DeploymentID.DSeq { - log.Error("lease for order closed", "oseq", event.ID.OSeq, "price", event.Price) - return fmt.Errorf("%w: lease closed oseq: %v", errUnexpectedEvent, event.ID.OSeq) - } - return - - // Ignore any other event - default: - log.Debug("Ignoring event") - return - } - } -} diff --git a/deploy/cmd/root.go b/deploy/cmd/root.go deleted file mode 100644 index 1a1bd7ff8f..0000000000 --- a/deploy/cmd/root.go +++ /dev/null @@ -1,34 +0,0 @@ -package cmd - -import ( - "os" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/cosmos/cosmos-sdk/client/flags" - "github.com/tendermint/tendermint/libs/log" -) - -var ( - // logger is the logger for the application - logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) -) - -// RootCmd represents root command of deploy tool -func RootCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "deploy", - Short: "Akash deploy tool commands", - SilenceUsage: true, - } - - cmd.PersistentFlags().String(flags.FlagNode, "http://localhost:26657", "The node address") - if err := viper.BindPFlag(flags.FlagNode, cmd.PersistentFlags().Lookup(flags.FlagNode)); err != nil { - return nil - } - - cmd.AddCommand(createCmd()) - - return cmd -} diff --git a/deploy/cmd/utils.go b/deploy/cmd/utils.go deleted file mode 100644 index ef45708e3d..0000000000 --- a/deploy/cmd/utils.go +++ /dev/null @@ -1,68 +0,0 @@ -package cmd - -import ( - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/tx" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/ovrclk/akash/sdkutil" - "github.com/spf13/pflag" -) - -// SendMsgs sends given sdk messages -func SendMsgs(clientCtx client.Context, flags *pflag.FlagSet, datagrams []sdk.Msg) (res *sdk.TxResponse, err error) { - // validate basic all the msgs - for _, msg := range datagrams { - if err := msg.ValidateBasic(); err != nil { - return res, err - } - } - - return BuildAndBroadcastTx(clientCtx, flags, datagrams) -} - -// BuildAndBroadcastTx takes messages and builds, signs and marshals a sdk.Tx to prepare it for broadcast -func BuildAndBroadcastTx(clientCtx client.Context, flags *pflag.FlagSet, msgs []sdk.Msg) (*sdk.TxResponse, error) { - txf := tx.NewFactoryCLI(clientCtx, flags). - WithTxConfig(clientCtx.TxConfig). - WithAccountRetriever(clientCtx.AccountRetriever) - - keyname := clientCtx.GetFromName() - info, err := txf.Keybase().Key(keyname) - if err != nil { - return nil, err - } - - txf, err = sdkutil.PrepareFactory(clientCtx, txf) - if err != nil { - return nil, err - } - - // If users pass gas adjustment, then calculate gas - _, adjusted, err := tx.CalculateGas(clientCtx, txf, msgs...) - if err != nil { - return nil, err - } - - // Set the gas amount on the transaction factory - txf = txf.WithGas(adjusted) - - // Build the transaction builder - txb, err := tx.BuildUnsignedTx(txf, msgs...) - if err != nil { - return nil, err - } - - // Attach the signature to the transaction - err = tx.Sign(txf, info.GetName(), txb, true) - if err != nil { - return nil, err - } - - // Generate the transaction bytes - txBytes, err := clientCtx.TxConfig.TxEncoder()(txb.GetTx()) - if err != nil { - return nil, err - } - - return clientCtx.BroadcastTxSync(txBytes) -}