Skip to content

Commit

Permalink
review update. Now only put status updates into subscribe channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
nullfunc committed Jun 19, 2024
1 parent 795605e commit 627705a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
16 changes: 14 additions & 2 deletions src/cmd/cli/command/servicemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,26 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv
}

// set up service status subscription (non-blocking)
serviceStatusChan, err := cli.Subscribe(ctx, client, serviceList)
subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList)
if err != nil {
term.Debugf("error subscribing to service status: %v", err)
return err
}

serviceStatus := make(map[string]string, len(serviceList))
for _, name := range serviceList {
serviceStatus[name] = string(cli.ServiceUnknown)
}

// monitor for when all services are completed to end this command
for serviceStatus := range serviceStatusChan {
for newStatus := range subscribeServiceStatusChan {
if _, ok := serviceStatus[newStatus.Name]; !ok {
term.Debugf("unexpected service %s update", newStatus.Name)
continue
}

serviceStatus[newStatus.Name] = newStatus.Status

if allInStatus(targetStatus, serviceStatus) {
for _, sInfo := range serviceInfos {
sInfo.Status = string(targetStatus)
Expand Down
29 changes: 12 additions & 17 deletions src/pkg/cli/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,24 @@ import (
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
)

func Subscribe(ctx context.Context, client client.Client, services []string) (<-chan map[string]string, error) {
type SubscribeServiceStatus struct {
Name string
Status string
}

func Subscribe(ctx context.Context, client client.Client, services []string) (<-chan SubscribeServiceStatus, error) {
if len(services) == 0 {
return nil, fmt.Errorf("no services specified")
}

serviceStatus := make(map[string]string, len(services))
normalizedServiceNameToServiceName := make(map[string]string, len(services))

for i, service := range services {
services[i] = NormalizeServiceName(service)
normalizedServiceNameToServiceName[services[i]] = service
serviceStatus[service] = string(ServiceUnknown)
}

statusChan := make(chan map[string]string, len(services))
statusChan := make(chan SubscribeServiceStatus, len(services))
if DoDryRun {
defer close(statusChan)
return statusChan, ErrDryRun
Expand Down Expand Up @@ -67,23 +70,15 @@ func Subscribe(ctx context.Context, client client.Client, services []string) (<-
term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name)
continue
}
serviceStatus[serviceName] = servInfo.Status

//make a copy to be put into channel
serviceStatusCopy := cloneServiceStatus(serviceStatus)
status := SubscribeServiceStatus{
Name: serviceName,
Status: servInfo.Status,
}

statusChan <- serviceStatusCopy
statusChan <- status
term.Debugf("service %s with status %s\n", serviceName, servInfo.Status)
}
}()

return statusChan, nil
}

func cloneServiceStatus(serviceStatus map[string]string) map[string]string {
serviceStatusCopy := make(map[string]string, len(serviceStatus))
for k, v := range serviceStatus {
serviceStatusCopy[k] = v
}
return serviceStatusCopy
}

0 comments on commit 627705a

Please sign in to comment.